Send packets for processing in batches.

This commit is contained in:
John Preston 2019-12-19 18:14:05 +03:00
parent c4319a7370
commit f51f133832
13 changed files with 167 additions and 77 deletions

View File

@ -1188,7 +1188,7 @@ void Gif::createStreamedPlayer() {
if (_streamed->instance.ready()) {
streamingReady(base::duplicate(_streamed->instance.info()));
}
startStreamedPlayer();
checkStreamedIsStarted();
}
void Gif::startStreamedPlayer() const {

View File

@ -33,7 +33,11 @@ void Loaders::feedFromExternal(ExternalSoundPart &&part) {
QMutexLocker lock(&_fromExternalMutex);
invoke = _fromExternalQueues.empty()
&& _fromExternalForceToBuffer.empty();
_fromExternalQueues[part.audio].push_back(std::move(part.packet));
auto &queue = _fromExternalQueues[part.audio];
queue.insert(
end(queue),
std::make_move_iterator(part.packets.begin()),
std::make_move_iterator(part.packets.end()));
}
if (invoke) {
_fromExternalNotify.call();

View File

@ -22,7 +22,7 @@ struct ExternalSoundData {
struct ExternalSoundPart {
AudioMsgId audio;
FFmpeg::Packet packet;
gsl::span<FFmpeg::Packet> packets;
};
class ChildFFMpegLoader : public AbstractAudioFFMpegLoader {

View File

@ -490,9 +490,10 @@ FFMpegReaderImplementation::PacketResult FFMpegReaderImplementation::readPacket(
if (res == AVERROR_EOF) {
if (_audioStreamId >= 0) {
// queue terminating packet to audio player
auto empty = FFmpeg::Packet();
Player::mixer()->feedFromExternal({
_audioMsgId,
FFmpeg::Packet()
gsl::make_span(&empty, 1)
});
}
return PacketResult::EndOfFile;
@ -519,7 +520,7 @@ void FFMpegReaderImplementation::processPacket(FFmpeg::Packet &&packet) {
// queue packet to audio player
Player::mixer()->feedFromExternal({
_audioMsgId,
std::move(packet)
gsl::make_span(&packet, 1)
});
}
}

View File

@ -47,14 +47,21 @@ crl::time AudioTrack::streamDuration() const {
return _stream.duration;
}
void AudioTrack::process(FFmpeg::Packet &&packet) {
if (packet.empty()) {
void AudioTrack::process(std::vector<FFmpeg::Packet> &&packets) {
if (packets.empty()) {
return;
} else if (packets.front().empty()) {
Assert(packets.size() == 1);
_readTillEnd = true;
}
if (initialized()) {
mixerEnqueue(std::move(packet));
} else if (!tryReadFirstFrame(std::move(packet))) {
_error(Error::InvalidData);
for (auto i = begin(packets), e = end(packets); i != e; ++i) {
if (initialized()) {
mixerEnqueue(gsl::make_span(&*i, (e - i)));
break;
} else if (!tryReadFirstFrame(std::move(*i))) {
_error(Error::InvalidData);
break;
}
}
}
@ -148,10 +155,10 @@ void AudioTrack::callReady() {
base::take(_ready)({ VideoInformation(), data });
}
void AudioTrack::mixerEnqueue(FFmpeg::Packet &&packet) {
void AudioTrack::mixerEnqueue(gsl::span<FFmpeg::Packet> packets) {
Media::Player::mixer()->feedFromExternal({
_audioId,
std::move(packet)
packets
});
}
@ -172,8 +179,6 @@ void AudioTrack::resume(crl::time time) {
}
void AudioTrack::stop() {
Expects(initialized());
if (_audioId.externalPlayId()) {
Media::Player::mixer()->stop(_audioId);
}

View File

@ -46,7 +46,7 @@ public:
[[nodiscard]] crl::time streamDuration() const;
// Called from the same unspecified thread.
void process(FFmpeg::Packet &&packet);
void process(std::vector<FFmpeg::Packet> &&packets);
void waitForData();
// Called from the main thread.
@ -59,7 +59,7 @@ private:
[[nodiscard]] bool fillStateFromFrame();
[[nodiscard]] bool processFirstFrame();
void mixerInit();
void mixerEnqueue(FFmpeg::Packet &&packet);
void mixerEnqueue(gsl::span<FFmpeg::Packet> packets);
void mixerForceToBuffer();
void callReady();

View File

@ -16,6 +16,7 @@ namespace Streaming {
namespace {
constexpr auto kMaxSingleReadAmount = 8 * 1024 * 1024;
constexpr auto kMaxQueuedPackets = 1024;
} // namespace
@ -54,6 +55,7 @@ int File::Context::read(bytes::span buffer) {
buffer = buffer.subspan(0, amount);
while (!_reader->fill(_offset, buffer, &_semaphore)) {
processQueuedPackets(SleepPolicy::Disallowed);
_delegate->fileWaitingForData();
_semaphore.acquire();
if (_interrupted) {
@ -264,6 +266,13 @@ void File::Context::start(crl::time position) {
return;
}
if (video.codec) {
_queuedPackets[video.index].reserve(kMaxQueuedPackets);
}
if (audio.codec) {
_queuedPackets[audio.index].reserve(kMaxQueuedPackets);
}
const auto header = _reader->headerSize();
if (!_delegate->fileReady(header, std::move(video), std::move(audio))) {
return fail(Error::OpenFailed);
@ -287,24 +296,28 @@ void File::Context::readNextPacket() {
if (unroll()) {
return;
} else if (const auto packet = base::get_if<FFmpeg::Packet>(&result)) {
const auto more = _delegate->fileProcessPacket(std::move(*packet));
if (!more) {
do {
_reader->startSleep(&_semaphore);
_semaphore.acquire();
_reader->stopSleep();
} while (!unroll() && !_delegate->fileReadMore());
const auto index = packet->fields().stream_index;
const auto i = _queuedPackets.find(index);
if (i == end(_queuedPackets)) {
return;
}
i->second.push_back(std::move(*packet));
if (i->second.size() == kMaxQueuedPackets) {
processQueuedPackets(SleepPolicy::Allowed);
}
} else {
// Still trying to read by drain.
Assert(result.is<FFmpeg::AvErrorWrap>());
Assert(result.get<FFmpeg::AvErrorWrap>().code() == AVERROR_EOF);
handleEndOfFile();
processQueuedPackets(SleepPolicy::Allowed);
if (!finished()) {
handleEndOfFile();
}
}
}
void File::Context::handleEndOfFile() {
const auto more = _delegate->fileProcessPacket(FFmpeg::Packet());
const auto more = _delegate->fileProcessEndOfFile();
if (_delegate->fileReadMore()) {
_readTillEnd = false;
auto error = FFmpeg::AvErrorWrap(av_seek_frame(
@ -320,6 +333,17 @@ void File::Context::handleEndOfFile() {
}
}
void File::Context::processQueuedPackets(SleepPolicy policy) {
const auto more = _delegate->fileProcessPackets(_queuedPackets);
if (!more && policy == SleepPolicy::Allowed) {
do {
_reader->startSleep(&_semaphore);
_semaphore.acquire();
_reader->stopSleep();
} while (!unroll() && !_delegate->fileReadMore());
}
}
void File::Context::interrupt() {
_interrupted = true;
_semaphore.release();

View File

@ -58,6 +58,10 @@ private:
void waitTillInterrupted();
private:
enum class SleepPolicy {
Allowed,
Disallowed,
};
static int Read(void *opaque, uint8_t *buffer, int bufferSize);
static int64_t Seek(void *opaque, int64_t offset, int whence);
@ -82,6 +86,7 @@ private:
// TODO base::expected.
[[nodiscard]] auto readPacket()
-> base::variant<FFmpeg::Packet, FFmpeg::AvErrorWrap>;
void processQueuedPackets(SleepPolicy policy);
void handleEndOfFile();
void sendFullInCache(bool force = false);
@ -89,6 +94,7 @@ private:
const not_null<FileDelegate*> _delegate;
const not_null<Reader*> _reader;
base::flat_map<int, std::vector<FFmpeg::Packet>> _queuedPackets;
int _offset = 0;
int _size = 0;
bool _failed = false;

View File

@ -29,9 +29,10 @@ public:
// Return true if reading and processing more packets is desired.
// Return false if sleeping until 'wake()' is called is desired.
// Return true after the EOF packet if looping is desired.
[[nodiscard]] virtual bool fileProcessPacket(
FFmpeg::Packet &&packet) = 0;
[[nodiscard]] virtual bool fileProcessPackets(
base::flat_map<int, std::vector<FFmpeg::Packet>> &packets) = 0;
// Return true if looping is desired.
[[nodiscard]] virtual bool fileProcessEndOfFile() = 0;
[[nodiscard]] virtual bool fileReadMore() = 0;
};

View File

@ -343,54 +343,82 @@ void Player::fileWaitingForData() {
}
}
bool Player::fileProcessPacket(FFmpeg::Packet &&packet) {
bool Player::fileProcessPackets(
base::flat_map<int, std::vector<FFmpeg::Packet>> &packets) {
_waitingForData = false;
const auto &native = packet.fields();
const auto index = native.stream_index;
if (packet.empty()) {
_readTillEnd = true;
setDurationByPackets();
if (_audio) {
const auto till = _loopingShift + computeAudioDuration();
auto audioTill = kTimeUnknown;
auto videoTill = kTimeUnknown;
for (auto &[index, list] : packets) {
if (list.empty()) {
continue;
}
if (_audio && _audio->streamIndex() == index) {
//for (const auto &packet : list) {
// // Maybe it is enough to count by list.back()?.. hope so.
// accumulate_max(
// _durationByLastAudioPacket,
// durationByPacket(*_audio, packet));
//}
accumulate_max(
_durationByLastAudioPacket,
durationByPacket(*_audio, list.back()));
const auto till = _loopingShift + std::clamp(
FFmpeg::PacketPosition(
list.back(),
_audio->streamTimeBase()),
crl::time(0),
computeAudioDuration() - 1);
crl::on_main(&_sessionGuard, [=] {
audioReceivedTill(till);
});
_audio->process(FFmpeg::Packet());
}
if (_video) {
const auto till = _loopingShift + computeVideoDuration();
_audio->process(base::take(list));
} else if (_video && _video->streamIndex() == index) {
//for (const auto &packet : list) {
// // Maybe it is enough to count by list.back()?.. hope so.
// accumulate_max(
// _durationByLastVideoPacket,
// durationByPacket(*_video, packet));
//}
accumulate_max(
_durationByLastVideoPacket,
durationByPacket(*_video, list.back()));
const auto till = _loopingShift + std::clamp(
FFmpeg::PacketPosition(
list.back(),
_video->streamTimeBase()),
crl::time(0),
computeVideoDuration() - 1);
crl::on_main(&_sessionGuard, [=] {
videoReceivedTill(till);
});
_video->process(FFmpeg::Packet());
_video->process(base::take(list));
}
} else if (_audio && _audio->streamIndex() == native.stream_index) {
accumulate_max(
_durationByLastAudioPacket,
durationByPacket(*_audio, packet));
}
return fileReadMore();
}
const auto till = _loopingShift + std::clamp(
FFmpeg::PacketPosition(packet, _audio->streamTimeBase()),
crl::time(0),
computeAudioDuration() - 1);
bool Player::fileProcessEndOfFile() {
_waitingForData = false;
_readTillEnd = true;
setDurationByPackets();
const auto generateEmptyQueue = [] {
auto result = std::vector<FFmpeg::Packet>();
result.emplace_back();
return result;
};
if (_audio) {
const auto till = _loopingShift + computeAudioDuration();
crl::on_main(&_sessionGuard, [=] {
audioReceivedTill(till);
});
_audio->process(std::move(packet));
} else if (_video && _video->streamIndex() == native.stream_index) {
accumulate_max(
_durationByLastVideoPacket,
durationByPacket(*_video, packet));
const auto till = _loopingShift + std::clamp(
FFmpeg::PacketPosition(packet, _video->streamTimeBase()),
crl::time(0),
computeVideoDuration() - 1);
_audio->process(generateEmptyQueue());
}
if (_video) {
const auto till = _loopingShift + computeVideoDuration();
crl::on_main(&_sessionGuard, [=] {
videoReceivedTill(till);
});
_video->process(std::move(packet));
_video->process(generateEmptyQueue());
}
return fileReadMore();
}

View File

@ -97,7 +97,9 @@ private:
void fileError(Error error) override;
void fileWaitingForData() override;
void fileFullInCache(bool fullInCache) override;
bool fileProcessPacket(FFmpeg::Packet &&packet) override;
bool fileProcessPackets(
base::flat_map<int, std::vector<FFmpeg::Packet>> &packets) override;
bool fileProcessEndOfFile() override;
bool fileReadMore() override;
// Called from the main thread.

View File

@ -34,7 +34,7 @@ public:
FnMut<void(const Information &)> ready,
Fn<void(Error)> error);
void process(FFmpeg::Packet &&packet);
void process(std::vector<FFmpeg::Packet> &&packets);
[[nodisacrd]] rpl::producer<> checkNextFrame() const;
[[nodisacrd]] rpl::producer<> waitingForData() const;
@ -149,25 +149,42 @@ rpl::producer<> VideoTrackObject::waitingForData() const {
: _waitingForData.events();
}
void VideoTrackObject::process(FFmpeg::Packet &&packet) {
if (interrupted()) {
void VideoTrackObject::process(std::vector<FFmpeg::Packet> &&packets) {
if (interrupted() || packets.empty()) {
return;
}
if (packet.empty()) {
if (packets.front().empty()) {
Assert(packets.size() == 1);
_readTillEnd = true;
} else if (!_readTillEnd) {
//for (const auto &packet : packets) {
// // Maybe it is enough to count by list.back()?.. hope so.
// accumulate_max(
// _durationByLastPacket,
// durationByPacket(packet));
// if (interrupted()) {
// return;
// }
//}
accumulate_max(
_durationByLastPacket,
durationByPacket(packet));
durationByPacket(packets.back()));
if (interrupted()) {
return;
}
}
if (_shared->initialized()) {
_stream.queue.push_back(std::move(packet));
queueReadFrames();
} else if (!tryReadFirstFrame(std::move(packet))) {
fail(Error::InvalidData);
for (auto i = begin(packets), e = end(packets); i != e; ++i) {
if (_shared->initialized()) {
_stream.queue.insert(
end(_stream.queue),
std::make_move_iterator(i),
std::make_move_iterator(e));
queueReadFrames();
break;
} else if (!tryReadFirstFrame(std::move(*i))) {
fail(Error::InvalidData);
break;
}
}
}
@ -547,6 +564,7 @@ void VideoTrackObject::callReady() {
? _stream.duration
: _syncTimePoint.trackTime;
base::take(_ready)({ data });
LOG(("READY CALLED!"));
}
TimePoint VideoTrackObject::trackTime() const {
@ -887,11 +905,12 @@ crl::time VideoTrack::streamDuration() const {
return _streamDuration;
}
void VideoTrack::process(FFmpeg::Packet &&packet) {
void VideoTrack::process(std::vector<FFmpeg::Packet> &&packets) {
LOG(("PACKETS! (%1)").arg(packets.size()));
_wrapped.with([
packet = std::move(packet)
packets = std::move(packets)
](Implementation &unwrapped) mutable {
unwrapped.process(std::move(packet));
unwrapped.process(std::move(packets));
});
}

View File

@ -37,7 +37,7 @@ public:
[[nodiscard]] crl::time streamDuration() const;
// Called from the same unspecified thread.
void process(FFmpeg::Packet &&packet);
void process(std::vector<FFmpeg::Packet> &&packets);
void waitForData();
// Called from the main thread.