Load file parts even when not streaming.

This commit is contained in:
John Preston 2019-04-11 17:07:48 +04:00
parent 1ee4dac4f3
commit 27018d94ee
5 changed files with 151 additions and 46 deletions

View File

@ -36,24 +36,24 @@ private:
};
template <typename T>
template <typename T, template<typename...> typename Container = std::deque>
class thread_safe_queue {
public:
template <typename ...Args>
void emplace(Args &&...args) {
_wrap.with([&](std::vector<T> &value) {
_wrap.with([&](Container<T> &value) {
value.emplace_back(std::forward<Args>(args)...);
});
}
std::vector<T> take() {
return _wrap.with([&](std::vector<T> &value) {
return std::exchange(value, std::vector<T>());
Container<T> take() {
return _wrap.with([&](Container<T> &value) {
return std::exchange(value, Container<T>());
});
}
private:
thread_safe_wrap<std::vector<T>> _wrap;
thread_safe_wrap<Container<T>> _wrap;
};

View File

@ -10,6 +10,12 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
namespace Media {
namespace Streaming {
bool LoadedPart::valid(int size) const {
return (offset != kFailedOffset)
&& ((bytes.size() == Loader::kPartSize)
|| (offset + bytes.size() == size));
}
bool operator<(
const PriorityQueue::Entry &a,
const PriorityQueue::Entry &b) {

View File

@ -15,6 +15,8 @@ struct LoadedPart {
QByteArray bytes;
static constexpr auto kFailedOffset = -1;
[[nodiscard]] bool valid(int size) const;
};
class Loader {

View File

@ -573,6 +573,50 @@ auto Reader::Slices::fillFromHeader(int offset, bytes::span buffer)
return result;
}
QByteArray Reader::Slices::partForDownloader(int offset) const {
Expects(offset < _size);
if (const auto i = _header.parts.find(offset); i != end(_header.parts)) {
return i->second;
} else if (isFullInHeader()) {
return QByteArray();
}
const auto index = offset / kInSlice;
const auto &slice = _data[index];
const auto i = slice.parts.find(offset - index * kInSlice);
return (i != end(slice.parts)) ? i->second : QByteArray();
}
std::optional<int> Reader::Slices::readCacheRequiredFor(int offset) {
Expects(offset < _size);
using Flag = Slice::Flag;
if ((_header.flags & Flag::LoadingFromCache) || isFullInHeader()) {
return std::nullopt;
}
const auto index = offset / kInSlice;
auto &slice = _data[index];
if (slice.flags & (Flag::LoadedFromCache | Flag::LoadingFromCache)) {
return std::nullopt;
}
slice.flags |= Flag::LoadingFromCache;
return (index + 1);
}
bool Reader::Slices::waitForCacheRequiredFor(int offset) const {
Expects(offset < _size);
using Flag = Slice::Flag;
if (_header.flags & Flag::LoadingFromCache) {
return true;
} else if (isFullInHeader()) {
return false;
}
const auto index = offset / kInSlice;
const auto &slice = _data[index];
return (slice.flags & Flag::LoadingFromCache);
}
void Reader::Slices::markSliceUsed(int sliceIndex) {
const auto i = ranges::find(_usedSlices, sliceIndex);
const auto end = _usedSlices.end();
@ -711,18 +755,6 @@ QByteArray Reader::Slices::serializeAndUnloadFirstSliceNoHeader() {
return result;
}
template <typename Callback>
void Reader::Slices::enumerateParts(int sliceNumber, Callback &&callback) {
const auto shift = sliceNumber ? ((sliceNumber - 1) * kInSlice) : 0;
const auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header);
for (const auto &[offset, bytes] : slice.parts) {
callback(LoadedPart{ offset + shift, bytes });
}
if (!sliceNumber && isGoodHeader()) {
enumerateParts(1, std::forward<Callback>(callback));
}
}
Reader::SerializedSlice Reader::Slices::unloadToCache() {
if (_headerMode == HeaderMode::Unknown
|| _headerMode == HeaderMode::NoCache) {
@ -816,12 +848,84 @@ void Reader::cancelForDownloader() {
}
}
void Reader::processDownloaderRequests() {
void Reader::enqueueDownloaderOffsets() {
auto offsets = _downloaderOffsetRequests.take();
if (!empty(offsets)) {
if (!empty(_offsetsForDownloader)) {
_offsetsForDownloader.insert(
end(_offsetsForDownloader),
std::make_move_iterator(begin(offsets)),
std::make_move_iterator(end(offsets)));
checkForDownloaderChange(offsets.size() + 1);
} else {
_offsetsForDownloader = std::move(offsets);
checkForDownloaderChange(offsets.size());
}
}
}
void Reader::checkForDownloaderChange(int checkItemsCount) {
Expects(checkItemsCount <= _offsetsForDownloader.size());
// If a requested offset is less-or-equal of some previously requested
// offset, it means that the downloader was changed, ignore old offsets.
const auto end = _offsetsForDownloader.end();
const auto changed = std::adjacent_find(
end - checkItemsCount,
end,
[](int first, int second) { return (second <= first); });
if (changed != end) {
_offsetsForDownloader.erase(
begin(_offsetsForDownloader),
changed + 1);
}
}
void Reader::checkForDownloaderReadyOffsets() {
// If a requested part is available right now we simply fire it on the
// main thread, until the first not-available-right-now offset is found.
const auto unavailable = [&](int offset) {
auto bytes = _slices.partForDownloader(offset);
if (!bytes.isEmpty()) {
crl::on_main(this, [=, bytes = std::move(bytes)]() mutable {
_partsForDownloader.fire({ offset, std::move(bytes) });
});
return false;
}
return true;
};
_offsetsForDownloader.erase(
begin(_offsetsForDownloader),
ranges::find_if(_offsetsForDownloader, unavailable));
}
void Reader::processDownloaderRequests() {
checkForSomethingMoreReceived();
enqueueDownloaderOffsets();
checkForDownloaderReadyOffsets();
if (empty(_offsetsForDownloader)) {
return;
}
const auto offset = _offsetsForDownloader.front();
if (_cacheHelper) {
if (const auto sliceNumber = _slices.readCacheRequiredFor(offset)) {
readFromCache(*sliceNumber);
return;
} else if (_slices.waitForCacheRequiredFor(offset)) {
return;
}
}
_offsetsForDownloader.pop_front();
loadAtOffset(offset);
}
void Reader::checkCacheResultsForDownloader() {
if (_streamingActive) {
return;
}
processDownloaderRequests();
}
bool Reader::isRemoteLoader() const {
@ -922,8 +1026,7 @@ bool Reader::fill(
return false;
};
processCacheResults();
processLoadedParts();
checkForSomethingMoreReceived();
if (_failed) {
return failed();
}
@ -934,7 +1037,7 @@ bool Reader::fill(
return true;
}
startWaiting();
} while (processCacheResults() || processLoadedParts());
} while (checkForSomethingMoreReceived());
return _failed ? failed() : false;
}
@ -1007,34 +1110,17 @@ bool Reader::processCacheResults() {
Assert(loaded.size() > 1);
Assert((loaded.begin() + 1)->first == 1);
}
if (_downloaderAttached.load(std::memory_order_acquire)) {
for (const auto &[sliceNumber, result] : loaded) {
sendPartsToDownloader(sliceNumber);
}
}
return !loaded.empty();
}
void Reader::sendPartsToDownloader(int sliceNumber) {
_slices.enumerateParts(sliceNumber, [&](LoadedPart &&part) {
crl::on_main(this, [=, part = std::move(part)]() mutable {
AssertIsDebug(); // maybe send them with small timeout?
_partsForDownloader.fire(std::move(part));
});
});
}
bool Reader::processLoadedParts() {
if (_failed) {
return false;
}
auto loaded = _loadedParts.take();
for (auto &part : loaded) {
if (part.offset == LoadedPart::kFailedOffset
|| (part.bytes.size() != Loader::kPartSize
&& part.offset + part.bytes.size() != size())) {
if (!part.valid(size())) {
_failed = Error::LoadFailed;
return false;
} else if (!_loadingOffsets.remove(part.offset)) {
@ -1047,6 +1133,12 @@ bool Reader::processLoadedParts() {
return !loaded.empty();
}
bool Reader::checkForSomethingMoreReceived() {
const auto result1 = processCacheResults();
const auto result2 = processLoadedParts();
return result1 || result2;
}
void Reader::loadAtOffset(int offset) {
if (_loadingOffsets.add(offset)) {
_loader->load(offset);

View File

@ -137,9 +137,9 @@ private:
[[nodiscard]] FillResult fill(int offset, bytes::span buffer);
[[nodiscard]] SerializedSlice unloadToCache();
// callback(LoadedPart(..)).
template <typename Callback>
void enumerateParts(int sliceNumber, Callback &&callback);
[[nodiscard]] QByteArray partForDownloader(int offset) const;
[[nodiscard]] std::optional<int> readCacheRequiredFor(int offset);
[[nodiscard]] bool waitForCacheRequiredFor(int offset) const;
private:
enum class HeaderMode {
@ -175,7 +175,6 @@ private:
// 0 is for headerData, slice index = sliceNumber - 1.
void readFromCache(int sliceNumber);
bool processCacheResults();
void sendPartsToDownloader(int sliceNumber);
void putToCache(SerializedSlice &&data);
void cancelLoadInRange(int from, int till);
@ -183,12 +182,17 @@ private:
void checkLoadWillBeFirst(int offset);
bool processLoadedParts();
bool checkForSomethingMoreReceived();
bool fillFromSlices(int offset, bytes::span buffer);
void finalizeCache();
void processDownloaderRequests();
void checkCacheResultsForDownloader();
void enqueueDownloaderOffsets();
void checkForDownloaderChange(int checkItemsCount);
void checkForDownloaderReadyOffsets();
static std::shared_ptr<CacheHelper> InitCacheHelper(
std::optional<Storage::Cache::Key> baseKey);
@ -197,7 +201,7 @@ private:
const std::unique_ptr<Loader> _loader;
const std::shared_ptr<CacheHelper> _cacheHelper;
base::thread_safe_queue<LoadedPart> _loadedParts;
base::thread_safe_queue<LoadedPart, std::vector> _loadedParts;
std::atomic<crl::semaphore*> _waiting = nullptr;
std::atomic<crl::semaphore*> _sleeping = nullptr;
PriorityQueue _loadingOffsets;
@ -207,6 +211,7 @@ private:
std::atomic<bool> _downloaderAttached = false;
base::thread_safe_queue<int> _downloaderOffsetRequests;
std::deque<int> _offsetsForDownloader;
// Main thread.
rpl::event_stream<LoadedPart> _partsForDownloader;