Register streaming loaders in Storage::Downloader.

This commit is contained in:
John Preston 2019-04-12 14:50:41 +04:00
parent cca906d383
commit b2895a39ed
11 changed files with 158 additions and 98 deletions

View File

@ -417,8 +417,8 @@ AuthSession::AuthSession(const MTPUser &user)
: _autoLockTimer([this] { checkAutoLock(); })
, _api(std::make_unique<ApiWrap>(this))
, _calls(std::make_unique<Calls::Instance>())
, _downloader(std::make_unique<Storage::Downloader>())
, _uploader(std::make_unique<Storage::Uploader>())
, _downloader(std::make_unique<Storage::Downloader>(_api.get()))
, _uploader(std::make_unique<Storage::Uploader>(_api.get()))
, _storage(std::make_unique<Storage::Facade>())
, _notifications(std::make_unique<Window::Notifications::System>(this))
, _data(std::make_unique<Data::Session>(this))

View File

@ -1240,7 +1240,7 @@ auto DocumentData::createStreamingLoader(
}
return hasRemoteLocation()
? std::make_unique<Media::Streaming::LoaderMtproto>(
&session().api(),
&session().downloader(),
StorageFileLocation(
_dc,
session().userId(),

View File

@ -21,16 +21,23 @@ constexpr auto kMaxConcurrentRequests = 4;
} // namespace
LoaderMtproto::LoaderMtproto(
not_null<ApiWrap*> api,
not_null<Storage::Downloader*> owner,
const StorageFileLocation &location,
int size,
Data::FileOrigin origin)
: _api(api)
: _owner(owner)
, _location(location)
, _dcId(location.dcId())
, _size(size)
, _origin(origin) {
}
LoaderMtproto::~LoaderMtproto() {
for (const auto [index, amount] : _amountByDcIndex) {
changeRequestedAmount(index, -amount);
}
}
std::optional<Storage::Cache::Key> LoaderMtproto::baseCacheKey() const {
return _location.bigFileBaseCacheKey();
}
@ -97,6 +104,11 @@ void LoaderMtproto::increasePriority() {
});
}
void LoaderMtproto::changeRequestedAmount(int index, int amount) {
_owner->requestedAmountIncrement(_dcId, index, amount);
_amountByDcIndex[index] += amount;
}
void LoaderMtproto::sendNext() {
if (_requests.size() >= kMaxConcurrentRequests) {
return;
@ -106,20 +118,23 @@ void LoaderMtproto::sendNext() {
return;
}
static auto DcIndex = 0;
const auto index = _owner->chooseDcIndexForRequest(_dcId);
changeRequestedAmount(index, kPartSize);
const auto usedFileReference = _location.fileReference();
const auto id = _sender.request(MTPupload_GetFile(
_location.tl(Auth().userId()),
MTP_int(offset),
MTP_int(kPartSize)
)).done([=](const MTPupload_File &result) {
changeRequestedAmount(index, -kPartSize);
requestDone(offset, result);
}).fail([=](const RPCError &error) {
changeRequestedAmount(index, -kPartSize);
requestFailed(offset, error, usedFileReference);
}).toDC(MTP::downloadDcId(
_location.dcId(),
(++DcIndex) % MTP::kDownloadSessionsCount
)).send();
}).toDC(
MTP::downloadDcId(_dcId, index)
).send();
_requests.emplace(offset, id);
sendNext();
@ -175,14 +190,14 @@ void LoaderMtproto::requestFailed(
sendNext();
}
};
_api->refreshFileReference(_origin, crl::guard(this, callback));
_owner->api().refreshFileReference(
_origin,
crl::guard(this, callback));
}
rpl::producer<LoadedPart> LoaderMtproto::parts() const {
return _parts.events();
}
LoaderMtproto::~LoaderMtproto() = default;
} // namespace Streaming
} // namespace Media

View File

@ -11,7 +11,9 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "mtproto/sender.h"
#include "data/data_file_origin.h"
class ApiWrap;
namespace Storage {
class Downloader;
} // namespace Storage
namespace Media {
namespace Streaming {
@ -19,10 +21,11 @@ namespace Streaming {
class LoaderMtproto : public Loader, public base::has_weak_ptr {
public:
LoaderMtproto(
not_null<ApiWrap*> api,
not_null<Storage::Downloader*> owner,
const StorageFileLocation &location,
int size,
Data::FileOrigin origin);
~LoaderMtproto();
[[nodiscard]] auto baseCacheKey() const
-> std::optional<Storage::Cache::Key> override;
@ -40,8 +43,6 @@ public:
Storage::StreamedFileDownloader *downloader) override;
void clearAttachedDownloader() override;
~LoaderMtproto();
private:
void sendNext();
@ -58,11 +59,13 @@ private:
const QByteArray &encryptionIV,
const QVector<MTPFileHash> &hashes);
void cancelForOffset(int offset);
void changeRequestedAmount(int index, int amount);
const not_null<ApiWrap*> _api;
const not_null<Storage::Downloader*> _owner;
// _location can be changed with an updated file_reference.
StorageFileLocation _location;
MTP::DcId _dcId = 0;
const int _size = 0;
const Data::FileOrigin _origin;
@ -71,6 +74,7 @@ private:
PriorityQueue _requested;
base::flat_map<int, mtpRequestId> _requests;
base::flat_map<int, int> _amountByDcIndex;
rpl::event_stream<LoadedPart> _parts;
Storage::StreamedFileDownloader *_downloader = nullptr;

View File

@ -26,6 +26,7 @@ constexpr auto kSlicesInMemory = 2;
// 1 MB of parts are requested from cloud ahead of reading demand.
constexpr auto kPreloadPartsAhead = 8;
constexpr auto kDownloaderRequestsLimit = 4;
using PartsMap = base::flat_map<int, QByteArray>;
@ -591,18 +592,16 @@ bool Reader::Slices::waitingForHeaderCache() const {
return (_header.flags & Slice::Flag::LoadingFromCache);
}
std::optional<int> Reader::Slices::readCacheRequiredFor(int offset) {
bool Reader::Slices::readCacheForDownloaderRequired(int offset) {
Expects(offset < _size);
Expects(!waitingForHeaderCache());
if (isFullInHeader()) {
return std::nullopt;
return false;
}
const auto index = offset / kInSlice;
auto &slice = _data[index];
return (slice.flags & Slice::Flag::LoadedFromCache)
? std::nullopt
: std::make_optional(index + 1);
return !(slice.flags & Slice::Flag::LoadedFromCache);
}
void Reader::Slices::markSliceUsed(int sliceIndex) {
@ -884,35 +883,39 @@ void Reader::checkForDownloaderChange(int checkItemsCount) {
_offsetsForDownloader.erase(
begin(_offsetsForDownloader),
changed + 1);
_downloaderSliceNumber = 0;
_downloaderSliceCache = std::nullopt;
_downloaderReadCache.clear();
_downloaderOffsetAcks.take();
}
}
void Reader::checkForDownloaderReadyOffsets() {
// If a requested part is available right now we simply fire it on the
// main thread, until the first not-available-right-now offset is found.
const auto ready = [&](int offset, QByteArray &&bytes) {
const auto unavailableInBytes = [&](int offset, QByteArray &&bytes) {
if (bytes.isEmpty()) {
return true;
}
crl::on_main(this, [=, bytes = std::move(bytes)]() mutable {
_partsForDownloader.fire({ offset, std::move(bytes) });
});
return true;
return false;
};
const auto unavailableInCache = [&](int offset) {
const auto index = (offset / kInSlice);
const auto sliceNumber = index + 1;
const auto i = _downloaderReadCache.find(sliceNumber);
if (i == end(_downloaderReadCache) || !i->second) {
return true;
}
const auto j = i->second->find(offset - index * kInSlice);
if (j == end(*i->second)) {
return true;
}
return unavailableInBytes(offset, std::move(j->second));
};
const auto unavailable = [&](int offset) {
auto bytes = _slices.partForDownloader(offset);
if (!bytes.isEmpty()) {
return !ready(offset, std::move(bytes));
}
const auto sliceIndex = (offset / kInSlice);
if ((sliceIndex + 1 == _downloaderSliceNumber)
&& _downloaderSliceCache) {
const auto i = _downloaderSliceCache->find(
offset - sliceIndex * kInSlice);
if (i != _downloaderSliceCache->end()) {
return !ready(offset, std::move(i->second));
}
}
return true;
return unavailableInBytes(offset, _slices.partForDownloader(offset))
&& unavailableInCache(offset);
};
_offsetsForDownloader.erase(
begin(_offsetsForDownloader),
@ -923,22 +926,42 @@ void Reader::processDownloaderRequests() {
processCacheResults();
enqueueDownloaderOffsets();
checkForDownloaderReadyOffsets();
if (empty(_offsetsForDownloader)) {
return;
pruneDoneDownloaderRequests();
if (!empty(_offsetsForDownloader)) {
pruneDownloaderCache(_offsetsForDownloader.front());
sendDownloaderRequests();
}
}
const auto offset = _offsetsForDownloader.front();
if (_cacheHelper && downloaderWaitForCachedSlice(offset)) {
return;
}
void Reader::pruneDownloaderCache(int minimalOffset) {
const auto minimalSliceNumber = (minimalOffset / kInSlice) + 1;
const auto removeTill = ranges::lower_bound(
_downloaderReadCache,
minimalSliceNumber,
ranges::less(),
&base::flat_map<int, std::optional<PartsMap>>::value_type::first);
_downloaderReadCache.erase(_downloaderReadCache.begin(), removeTill);
}
void Reader::pruneDoneDownloaderRequests() {
for (const auto done : _downloaderOffsetAcks.take()) {
_downloaderOffsetsRequested.remove(done);
const auto i = ranges::find(_offsetsForDownloader, done);
if (i != end(_offsetsForDownloader)) {
_offsetsForDownloader.erase(i);
}
}
}
_offsetsForDownloader.pop_front();
if (_downloaderOffsetsRequested.emplace(offset).second) {
_loader->load(offset);
void Reader::sendDownloaderRequests() {
auto &&offsets = ranges::view::all(
_offsetsForDownloader
) | ranges::view::take(kDownloaderRequestsLimit);
for (const auto offset : offsets) {
if ((!_cacheHelper || !downloaderWaitForCachedSlice(offset))
&& _downloaderOffsetsRequested.emplace(offset).second) {
_loader->load(offset);
}
}
}
@ -946,20 +969,22 @@ bool Reader::downloaderWaitForCachedSlice(int offset) {
if (_slices.waitingForHeaderCache()) {
return true;
}
const auto sliceNumber = _slices.readCacheRequiredFor(offset);
if (sliceNumber.value_or(0) != _downloaderSliceNumber) {
_downloaderSliceNumber = sliceNumber.value_or(0);
_downloaderSliceCache = std::nullopt;
if (_downloaderSliceNumber) {
if (readFromCacheForDownloader()) {
return true;
}
_downloaderSliceCache = PartsMap();
}
} else if (_downloaderSliceNumber && !_downloaderSliceCache) {
return true;
if (!_slices.readCacheForDownloaderRequired(offset)) {
return false;
}
return false;
const auto sliceNumber = (offset / kInSlice) + 1;
auto i = _downloaderReadCache.find(sliceNumber);
if (i == _downloaderReadCache.end()) {
// If we didn't request that slice yet, try requesting it.
// If there is no need to (header mode is unknown) - place empty map.
// Otherwise place std::nullopt and wait for the cache result.
i = _downloaderReadCache.emplace(
sliceNumber,
(readFromCacheForDownloader(sliceNumber)
? std::nullopt
: std::make_optional(PartsMap()))).first;
}
return !i->second;
}
void Reader::checkCacheResultsForDownloader() {
@ -1018,14 +1043,14 @@ void Reader::readFromCache(int sliceNumber) {
});
}
bool Reader::readFromCacheForDownloader() {
bool Reader::readFromCacheForDownloader(int sliceNumber) {
Expects(_cacheHelper != nullptr);
Expects(_downloaderSliceNumber > 0);
Expects(sliceNumber > 0);
if (_slices.headerModeUnknown()) {
return false;
}
readFromCache(_downloaderSliceNumber);
readFromCache(sliceNumber);
return true;
}
@ -1153,10 +1178,12 @@ bool Reader::processCacheResults() {
auto loaded = base::take(_cacheHelper->results);
lock.unlock();
if (_downloaderSliceNumber) {
const auto i = loaded.find(_downloaderSliceNumber);
if (i != end(loaded)) {
_downloaderSliceCache = i->second;
for (auto &[sliceNumber, cachedParts] : _downloaderReadCache) {
if (!cachedParts) {
const auto i = loaded.find(sliceNumber);
if (i != end(loaded)) {
cachedParts = i->second;
}
}
}

View File

@ -146,7 +146,7 @@ private:
[[nodiscard]] SerializedSlice unloadToCache();
[[nodiscard]] QByteArray partForDownloader(int offset) const;
[[nodiscard]] std::optional<int> readCacheRequiredFor(int offset);
[[nodiscard]] bool readCacheForDownloaderRequired(int offset);
private:
enum class HeaderMode {
@ -182,7 +182,7 @@ private:
// 0 is for headerData, slice index = sliceNumber - 1.
// returns false if asked for a known-empty downloader slice cache.
void readFromCache(int sliceNumber);
[[nodiscard]] bool readFromCacheForDownloader();
[[nodiscard]] bool readFromCacheForDownloader(int sliceNumber);
bool processCacheResults();
void putToCache(SerializedSlice &&data);
@ -199,6 +199,9 @@ private:
void processDownloaderRequests();
void checkCacheResultsForDownloader();
void pruneDownloaderCache(int minimalOffset);
void pruneDoneDownloaderRequests();
void sendDownloaderRequests();
[[nodiscard]] bool downloaderWaitForCachedSlice(int offset);
void enqueueDownloaderOffsets();
void checkForDownloaderChange(int checkItemsCount);
@ -221,17 +224,24 @@ private:
// Even if streaming had failed, the Reader can work for the downloader.
std::optional<Error> _streamingError;
Storage::StreamedFileDownloader *_attachedDownloader = nullptr;
base::thread_safe_queue<int> _downloaderOffsetRequests;
base::thread_safe_queue<int> _downloaderOffsetAcks;
std::deque<int> _offsetsForDownloader;
base::flat_set<int> _downloaderOffsetsRequested;
int _downloaderSliceNumber = 0; // > 0 means we want it from cache.
std::optional<PartsMap> _downloaderSliceCache;
// In case streaming is active both main and streaming threads have work.
// In case only downloader is active, all work is done on main thread.
// Main thread.
Storage::StreamedFileDownloader *_attachedDownloader = nullptr;
rpl::event_stream<LoadedPart> _partsForDownloader;
bool _streamingActive = false;
// Streaming thread.
std::deque<int> _offsetsForDownloader;
base::flat_set<int> _downloaderOffsetsRequested;
base::flat_map<int, std::optional<PartsMap>> _downloaderReadCache;
// Communication from main thread to streaming thread.
// Streaming thread to main thread communicates using crl::on_main.
base::thread_safe_queue<int> _downloaderOffsetRequests;
base::thread_safe_queue<int> _downloaderOffsetAcks;
rpl::lifetime _lifetime;
};

View File

@ -36,8 +36,9 @@ constexpr auto kMaxWebFileQueries = 8;
} // namespace
Downloader::Downloader()
: _killDownloadSessionsTimer([=] { killDownloadSessions(); })
Downloader::Downloader(not_null<ApiWrap*> api)
: _api(api)
, _killDownloadSessionsTimer([=] { killDownloadSessions(); })
, _queueForWeb(kMaxWebFileQueries) {
}
@ -48,14 +49,16 @@ void Downloader::clearPriorities() {
void Downloader::requestedAmountIncrement(MTP::DcId dcId, int index, int amount) {
Expects(index >= 0 && index < MTP::kDownloadSessionsCount);
using namespace rpl::mappers;
auto it = _requestedBytesAmount.find(dcId);
if (it == _requestedBytesAmount.cend()) {
it = _requestedBytesAmount.emplace(dcId, RequestedInDc { { 0 } }).first;
}
it->second[index] += amount;
if (it->second[index]) {
if (amount > 0) {
killDownloadSessionsStop(dcId);
} else {
} else if (ranges::find_if(it->second, _1 > 0) == end(it->second)) {
killDownloadSessionsStart(dcId);
}
}

View File

@ -12,6 +12,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "base/binary_guard.h"
#include "data/data_file_origin.h"
class ApiWrap;
namespace Storage {
namespace Cache {
struct Key;
@ -35,9 +37,13 @@ public:
FileLoader *end = nullptr;
};
Downloader();
explicit Downloader(not_null<ApiWrap*> api);
~Downloader();
ApiWrap &api() const {
return *_api;
}
int currentPriority() const {
return _priority;
}
@ -58,6 +64,8 @@ private:
void killDownloadSessionsStop(MTP::DcId dcId);
void killDownloadSessions();
not_null<ApiWrap*> _api;
base::Observable<void> _taskFinishedObservable;
int _priority = 1;

View File

@ -140,7 +140,8 @@ const QString &Uploader::File::filename() const {
return file ? file->filename : media.filename;
}
Uploader::Uploader() {
Uploader::Uploader(not_null<ApiWrap*> api)
: _api(api) {
nextTimer.setSingleShot(true);
connect(&nextTimer, SIGNAL(timeout()), this, SLOT(sendNext()));
stopSessionsTimer.setSingleShot(true);

View File

@ -9,6 +9,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
struct FileLoadResult;
struct SendMediaReady;
class ApiWrap;
namespace Storage {
@ -53,7 +54,9 @@ class Uploader : public QObject, public RPCSender {
Q_OBJECT
public:
Uploader();
explicit Uploader(not_null<ApiWrap*> api);
~Uploader();
void uploadMedia(const FullMsgId &msgId, const SendMediaReady &image);
void upload(
const FullMsgId &msgId,
@ -96,8 +99,6 @@ public:
return _secureFailed.events();
}
~Uploader();
public slots:
void unpause();
void sendNext();
@ -111,6 +112,7 @@ private:
void currentFailed();
not_null<ApiWrap*> _api;
base::flat_map<mtpRequestId, QByteArray> requestsSent;
base::flat_map<mtpRequestId, int32> docRequestsSent;
base::flat_map<mtpRequestId, int32> dcMap;

View File

@ -125,11 +125,6 @@ bool StreamedFileDownloader::loadPart() {
_nextPartIndex = index + 1;
_reader->loadForDownloader(this, index * kPartSize);
AssertIsDebug();
//_downloader->requestedAmountIncrement(
// requestData.dcId,
// requestData.dcIndex,
// kPartSize);
++_partsRequested;
++_queue->queriesCount;
@ -154,11 +149,6 @@ void StreamedFileDownloader::savePart(const LoadedPart &part) {
++_partsSaved;
if (index < _nextPartIndex) {
AssertIsDebug();
//_downloader->requestedAmountIncrement(
// requestData.dcId,
// requestData.dcIndex,
// -kPartSize);
--_partsRequested;
--_queue->queriesCount;
}