From f51f133832ee2024ff0f30560bf4c944860307df Mon Sep 17 00:00:00 2001
From: John Preston <johnprestonmail@gmail.com>
Date: Thu, 19 Dec 2019 18:14:05 +0300
Subject: [PATCH] Send packets for processing in batches.

---
 .../history/view/media/history_view_gif.cpp   |  2 +-
 .../media/audio/media_audio_loaders.cpp       |  6 +-
 .../media/audio/media_child_ffmpeg_loader.h   |  2 +-
 .../media/clip/media_clip_ffmpeg.cpp          |  5 +-
 .../streaming/media_streaming_audio_track.cpp | 25 +++--
 .../streaming/media_streaming_audio_track.h   |  4 +-
 .../media/streaming/media_streaming_file.cpp  | 42 +++++++--
 .../media/streaming/media_streaming_file.h    |  6 ++
 .../streaming/media_streaming_file_delegate.h |  7 +-
 .../streaming/media_streaming_player.cpp      | 94 ++++++++++++-------
 .../media/streaming/media_streaming_player.h  |  4 +-
 .../streaming/media_streaming_video_track.cpp | 45 ++++++---
 .../streaming/media_streaming_video_track.h   |  2 +-
 13 files changed, 167 insertions(+), 77 deletions(-)

diff --git a/Telegram/SourceFiles/history/view/media/history_view_gif.cpp b/Telegram/SourceFiles/history/view/media/history_view_gif.cpp
index af8b57c1a..554621b7c 100644
--- a/Telegram/SourceFiles/history/view/media/history_view_gif.cpp
+++ b/Telegram/SourceFiles/history/view/media/history_view_gif.cpp
@@ -1188,7 +1188,7 @@ void Gif::createStreamedPlayer() {
 	if (_streamed->instance.ready()) {
 		streamingReady(base::duplicate(_streamed->instance.info()));
 	}
-	startStreamedPlayer();
+	checkStreamedIsStarted();
 }
 
 void Gif::startStreamedPlayer() const {
diff --git a/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp b/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp
index 44b3d534b..82c0041a3 100644
--- a/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp
+++ b/Telegram/SourceFiles/media/audio/media_audio_loaders.cpp
@@ -33,7 +33,11 @@ void Loaders::feedFromExternal(ExternalSoundPart &&part) {
 		QMutexLocker lock(&_fromExternalMutex);
 		invoke = _fromExternalQueues.empty()
 			&& _fromExternalForceToBuffer.empty();
-		_fromExternalQueues[part.audio].push_back(std::move(part.packet));
+		auto &queue = _fromExternalQueues[part.audio];
+		queue.insert(
+			end(queue),
+			std::make_move_iterator(part.packets.begin()),
+			std::make_move_iterator(part.packets.end()));
 	}
 	if (invoke) {
 		_fromExternalNotify.call();
diff --git a/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h b/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h
index 7b9e18a8a..95fb9a320 100644
--- a/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h
+++ b/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h
@@ -22,7 +22,7 @@ struct ExternalSoundData {
 
 struct ExternalSoundPart {
 	AudioMsgId audio;
-	FFmpeg::Packet packet;
+	gsl::span<FFmpeg::Packet> packets;
 };
 
 class ChildFFMpegLoader : public AbstractAudioFFMpegLoader {
diff --git a/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp b/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp
index e4dfbe18e..68b56a388 100644
--- a/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp
+++ b/Telegram/SourceFiles/media/clip/media_clip_ffmpeg.cpp
@@ -490,9 +490,10 @@ FFMpegReaderImplementation::PacketResult FFMpegReaderImplementation::readPacket(
 		if (res == AVERROR_EOF) {
 			if (_audioStreamId >= 0) {
 				// queue terminating packet to audio player
+				auto empty = FFmpeg::Packet();
 				Player::mixer()->feedFromExternal({
 					_audioMsgId,
-					FFmpeg::Packet()
+					gsl::make_span(&empty, 1)
 				});
 			}
 			return PacketResult::EndOfFile;
@@ -519,7 +520,7 @@ void FFMpegReaderImplementation::processPacket(FFmpeg::Packet &&packet) {
 			// queue packet to audio player
 			Player::mixer()->feedFromExternal({
 				_audioMsgId,
-				std::move(packet)
+				gsl::make_span(&packet, 1)
 			});
 		}
 	}
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp
index 00f21d856..dc573c06c 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp
@@ -47,14 +47,21 @@ crl::time AudioTrack::streamDuration() const {
 	return _stream.duration;
 }
 
-void AudioTrack::process(FFmpeg::Packet &&packet) {
-	if (packet.empty()) {
+void AudioTrack::process(std::vector<FFmpeg::Packet> &&packets) {
+	if (packets.empty()) {
+		return;
+	} else if (packets.front().empty()) {
+		Assert(packets.size() == 1);
 		_readTillEnd = true;
 	}
-	if (initialized()) {
-		mixerEnqueue(std::move(packet));
-	} else if (!tryReadFirstFrame(std::move(packet))) {
-		_error(Error::InvalidData);
+	for (auto i = begin(packets), e = end(packets); i != e; ++i) {
+		if (initialized()) {
+			mixerEnqueue(gsl::make_span(&*i, (e - i)));
+			break;
+		} else if (!tryReadFirstFrame(std::move(*i))) {
+			_error(Error::InvalidData);
+			break;
+		}
 	}
 }
 
@@ -148,10 +155,10 @@ void AudioTrack::callReady() {
 	base::take(_ready)({ VideoInformation(), data });
 }
 
-void AudioTrack::mixerEnqueue(FFmpeg::Packet &&packet) {
+void AudioTrack::mixerEnqueue(gsl::span<FFmpeg::Packet> packets) {
 	Media::Player::mixer()->feedFromExternal({
 		_audioId,
-		std::move(packet)
+		packets
 	});
 }
 
@@ -172,8 +179,6 @@ void AudioTrack::resume(crl::time time) {
 }
 
 void AudioTrack::stop() {
-	Expects(initialized());
-
 	if (_audioId.externalPlayId()) {
 		Media::Player::mixer()->stop(_audioId);
 	}
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h
index 5d05c58b2..e0ecedfc0 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h
@@ -46,7 +46,7 @@ public:
 	[[nodiscard]] crl::time streamDuration() const;
 
 	// Called from the same unspecified thread.
-	void process(FFmpeg::Packet &&packet);
+	void process(std::vector<FFmpeg::Packet> &&packets);
 	void waitForData();
 
 	// Called from the main thread.
@@ -59,7 +59,7 @@ private:
 	[[nodiscard]] bool fillStateFromFrame();
 	[[nodiscard]] bool processFirstFrame();
 	void mixerInit();
-	void mixerEnqueue(FFmpeg::Packet &&packet);
+	void mixerEnqueue(gsl::span<FFmpeg::Packet> packets);
 	void mixerForceToBuffer();
 	void callReady();
 
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
index cec4f762f..19fd90eb5 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
@@ -16,6 +16,7 @@ namespace Streaming {
 namespace {
 
 constexpr auto kMaxSingleReadAmount = 8 * 1024 * 1024;
+constexpr auto kMaxQueuedPackets = 1024;
 
 } // namespace
 
@@ -54,6 +55,7 @@ int File::Context::read(bytes::span buffer) {
 
 	buffer = buffer.subspan(0, amount);
 	while (!_reader->fill(_offset, buffer, &_semaphore)) {
+		processQueuedPackets(SleepPolicy::Disallowed);
 		_delegate->fileWaitingForData();
 		_semaphore.acquire();
 		if (_interrupted) {
@@ -264,6 +266,13 @@ void File::Context::start(crl::time position) {
 		return;
 	}
 
+	if (video.codec) {
+		_queuedPackets[video.index].reserve(kMaxQueuedPackets);
+	}
+	if (audio.codec) {
+		_queuedPackets[audio.index].reserve(kMaxQueuedPackets);
+	}
+
 	const auto header = _reader->headerSize();
 	if (!_delegate->fileReady(header, std::move(video), std::move(audio))) {
 		return fail(Error::OpenFailed);
@@ -287,24 +296,28 @@ void File::Context::readNextPacket() {
 	if (unroll()) {
 		return;
 	} else if (const auto packet = base::get_if<FFmpeg::Packet>(&result)) {
-		const auto more = _delegate->fileProcessPacket(std::move(*packet));
-		if (!more) {
-			do {
-				_reader->startSleep(&_semaphore);
-				_semaphore.acquire();
-				_reader->stopSleep();
-			} while (!unroll() && !_delegate->fileReadMore());
+		const auto index = packet->fields().stream_index;
+		const auto i = _queuedPackets.find(index);
+		if (i == end(_queuedPackets)) {
+			return;
+		}
+		i->second.push_back(std::move(*packet));
+		if (i->second.size() == kMaxQueuedPackets) {
+			processQueuedPackets(SleepPolicy::Allowed);
 		}
 	} else {
 		// Still trying to read by drain.
 		Assert(result.is<FFmpeg::AvErrorWrap>());
 		Assert(result.get<FFmpeg::AvErrorWrap>().code() == AVERROR_EOF);
-		handleEndOfFile();
+		processQueuedPackets(SleepPolicy::Allowed);
+		if (!finished()) {
+			handleEndOfFile();
+		}
 	}
 }
 
 void File::Context::handleEndOfFile() {
-	const auto more = _delegate->fileProcessPacket(FFmpeg::Packet());
+	const auto more = _delegate->fileProcessEndOfFile();
 	if (_delegate->fileReadMore()) {
 		_readTillEnd = false;
 		auto error = FFmpeg::AvErrorWrap(av_seek_frame(
@@ -320,6 +333,17 @@ void File::Context::handleEndOfFile() {
 	}
 }
 
+void File::Context::processQueuedPackets(SleepPolicy policy) {
+	const auto more = _delegate->fileProcessPackets(_queuedPackets);
+	if (!more && policy == SleepPolicy::Allowed) {
+		do {
+			_reader->startSleep(&_semaphore);
+			_semaphore.acquire();
+			_reader->stopSleep();
+		} while (!unroll() && !_delegate->fileReadMore());
+	}
+}
+
 void File::Context::interrupt() {
 	_interrupted = true;
 	_semaphore.release();
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.h b/Telegram/SourceFiles/media/streaming/media_streaming_file.h
index 773cbe926..63450e89e 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_file.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.h
@@ -58,6 +58,10 @@ private:
 		void waitTillInterrupted();
 
 	private:
+		enum class SleepPolicy {
+			Allowed,
+			Disallowed,
+		};
 		static int Read(void *opaque, uint8_t *buffer, int bufferSize);
 		static int64_t Seek(void *opaque, int64_t offset, int whence);
 
@@ -82,6 +86,7 @@ private:
 		// TODO base::expected.
 		[[nodiscard]] auto readPacket()
 		-> base::variant<FFmpeg::Packet, FFmpeg::AvErrorWrap>;
+		void processQueuedPackets(SleepPolicy policy);
 
 		void handleEndOfFile();
 		void sendFullInCache(bool force = false);
@@ -89,6 +94,7 @@ private:
 		const not_null<FileDelegate*> _delegate;
 		const not_null<Reader*> _reader;
 
+		base::flat_map<int, std::vector<FFmpeg::Packet>> _queuedPackets;
 		int _offset = 0;
 		int _size = 0;
 		bool _failed = false;
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file_delegate.h b/Telegram/SourceFiles/media/streaming/media_streaming_file_delegate.h
index 9b001b2de..2facdbe74 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_file_delegate.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_file_delegate.h
@@ -29,9 +29,10 @@ public:
 
 	// Return true if reading and processing more packets is desired.
 	// Return false if sleeping until 'wake()' is called is desired.
-	// Return true after the EOF packet if looping is desired.
-	[[nodiscard]] virtual bool fileProcessPacket(
-		FFmpeg::Packet &&packet) = 0;
+	[[nodiscard]] virtual bool fileProcessPackets(
+		base::flat_map<int, std::vector<FFmpeg::Packet>> &packets) = 0;
+	// Return true if looping is desired.
+	[[nodiscard]] virtual bool fileProcessEndOfFile() = 0;
 	[[nodiscard]] virtual bool fileReadMore() = 0;
 };
 
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp
index 56c8aa887..96f85e17f 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp
@@ -343,54 +343,82 @@ void Player::fileWaitingForData() {
 	}
 }
 
-bool Player::fileProcessPacket(FFmpeg::Packet &&packet) {
+bool Player::fileProcessPackets(
+		base::flat_map<int, std::vector<FFmpeg::Packet>> &packets) {
 	_waitingForData = false;
-
-	const auto &native = packet.fields();
-	const auto index = native.stream_index;
-	if (packet.empty()) {
-		_readTillEnd = true;
-		setDurationByPackets();
-		if (_audio) {
-			const auto till = _loopingShift + computeAudioDuration();
+	auto audioTill = kTimeUnknown;
+	auto videoTill = kTimeUnknown;
+	for (auto &[index, list] : packets) {
+		if (list.empty()) {
+			continue;
+		}
+		if (_audio && _audio->streamIndex() == index) {
+			//for (const auto &packet : list) {
+			//	// Maybe it is enough to count by list.back()?.. hope so.
+			//	accumulate_max(
+			//		_durationByLastAudioPacket,
+			//		durationByPacket(*_audio, packet));
+			//}
+			accumulate_max(
+				_durationByLastAudioPacket,
+				durationByPacket(*_audio, list.back()));
+			const auto till = _loopingShift + std::clamp(
+				FFmpeg::PacketPosition(
+					list.back(),
+					_audio->streamTimeBase()),
+				crl::time(0),
+				computeAudioDuration() - 1);
 			crl::on_main(&_sessionGuard, [=] {
 				audioReceivedTill(till);
 			});
-			_audio->process(FFmpeg::Packet());
-		}
-		if (_video) {
-			const auto till = _loopingShift + computeVideoDuration();
+			_audio->process(base::take(list));
+		} else if (_video && _video->streamIndex() == index) {
+			//for (const auto &packet : list) {
+			//	// Maybe it is enough to count by list.back()?.. hope so.
+			//	accumulate_max(
+			//		_durationByLastVideoPacket,
+			//		durationByPacket(*_video, packet));
+			//}
+			accumulate_max(
+				_durationByLastVideoPacket,
+				durationByPacket(*_video, list.back()));
+			const auto till = _loopingShift + std::clamp(
+				FFmpeg::PacketPosition(
+					list.back(),
+					_video->streamTimeBase()),
+				crl::time(0),
+				computeVideoDuration() - 1);
 			crl::on_main(&_sessionGuard, [=] {
 				videoReceivedTill(till);
 			});
-			_video->process(FFmpeg::Packet());
+			_video->process(base::take(list));
 		}
-	} else if (_audio && _audio->streamIndex() == native.stream_index) {
-		accumulate_max(
-			_durationByLastAudioPacket,
-			durationByPacket(*_audio, packet));
+	}
+	return fileReadMore();
+}
 
-		const auto till = _loopingShift + std::clamp(
-			FFmpeg::PacketPosition(packet, _audio->streamTimeBase()),
-			crl::time(0),
-			computeAudioDuration() - 1);
+bool Player::fileProcessEndOfFile() {
+	_waitingForData = false;
+	_readTillEnd = true;
+	setDurationByPackets();
+	const auto generateEmptyQueue = [] {
+		auto result = std::vector<FFmpeg::Packet>();
+		result.emplace_back();
+		return result;
+	};
+	if (_audio) {
+		const auto till = _loopingShift + computeAudioDuration();
 		crl::on_main(&_sessionGuard, [=] {
 			audioReceivedTill(till);
 		});
-		_audio->process(std::move(packet));
-	} else if (_video && _video->streamIndex() == native.stream_index) {
-		accumulate_max(
-			_durationByLastVideoPacket,
-			durationByPacket(*_video, packet));
-
-		const auto till = _loopingShift + std::clamp(
-			FFmpeg::PacketPosition(packet, _video->streamTimeBase()),
-			crl::time(0),
-			computeVideoDuration() - 1);
+		_audio->process(generateEmptyQueue());
+	}
+	if (_video) {
+		const auto till = _loopingShift + computeVideoDuration();
 		crl::on_main(&_sessionGuard, [=] {
 			videoReceivedTill(till);
 		});
-		_video->process(std::move(packet));
+		_video->process(generateEmptyQueue());
 	}
 	return fileReadMore();
 }
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.h b/Telegram/SourceFiles/media/streaming/media_streaming_player.h
index 6182af4fb..73eecb7d3 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_player.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.h
@@ -97,7 +97,9 @@ private:
 	void fileError(Error error) override;
 	void fileWaitingForData() override;
 	void fileFullInCache(bool fullInCache) override;
-	bool fileProcessPacket(FFmpeg::Packet &&packet) override;
+	bool fileProcessPackets(
+		base::flat_map<int, std::vector<FFmpeg::Packet>> &packets) override;
+	bool fileProcessEndOfFile() override;
 	bool fileReadMore() override;
 
 	// Called from the main thread.
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp
index a1e1e4b2b..058739442 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp
@@ -34,7 +34,7 @@ public:
 		FnMut<void(const Information &)> ready,
 		Fn<void(Error)> error);
 
-	void process(FFmpeg::Packet &&packet);
+	void process(std::vector<FFmpeg::Packet> &&packets);
 
 	[[nodisacrd]] rpl::producer<> checkNextFrame() const;
 	[[nodisacrd]] rpl::producer<> waitingForData() const;
@@ -149,25 +149,42 @@ rpl::producer<> VideoTrackObject::waitingForData() const {
 		: _waitingForData.events();
 }
 
-void VideoTrackObject::process(FFmpeg::Packet &&packet) {
-	if (interrupted()) {
+void VideoTrackObject::process(std::vector<FFmpeg::Packet> &&packets) {
+	if (interrupted() || packets.empty()) {
 		return;
 	}
-	if (packet.empty()) {
+	if (packets.front().empty()) {
+		Assert(packets.size() == 1);
 		_readTillEnd = true;
 	} else if (!_readTillEnd) {
+		//for (const auto &packet : packets) {
+		//	// Maybe it is enough to count by list.back()?.. hope so.
+		//	accumulate_max(
+		//		_durationByLastPacket,
+		//		durationByPacket(packet));
+		//	if (interrupted()) {
+		//		return;
+		//	}
+		//}
 		accumulate_max(
 			_durationByLastPacket,
-			durationByPacket(packet));
+			durationByPacket(packets.back()));
 		if (interrupted()) {
 			return;
 		}
 	}
-	if (_shared->initialized()) {
-		_stream.queue.push_back(std::move(packet));
-		queueReadFrames();
-	} else if (!tryReadFirstFrame(std::move(packet))) {
-		fail(Error::InvalidData);
+	for (auto i = begin(packets), e = end(packets); i != e; ++i) {
+		if (_shared->initialized()) {
+			_stream.queue.insert(
+				end(_stream.queue),
+				std::make_move_iterator(i),
+				std::make_move_iterator(e));
+			queueReadFrames();
+			break;
+		} else if (!tryReadFirstFrame(std::move(*i))) {
+			fail(Error::InvalidData);
+			break;
+		}
 	}
 }
 
@@ -547,6 +564,7 @@ void VideoTrackObject::callReady() {
 		? _stream.duration
 		: _syncTimePoint.trackTime;
 	base::take(_ready)({ data });
+	LOG(("READY CALLED!"));
 }
 
 TimePoint VideoTrackObject::trackTime() const {
@@ -887,11 +905,12 @@ crl::time VideoTrack::streamDuration() const {
 	return _streamDuration;
 }
 
-void VideoTrack::process(FFmpeg::Packet &&packet) {
+void VideoTrack::process(std::vector<FFmpeg::Packet> &&packets) {
+	LOG(("PACKETS! (%1)").arg(packets.size()));
 	_wrapped.with([
-		packet = std::move(packet)
+		packets = std::move(packets)
 	](Implementation &unwrapped) mutable {
-		unwrapped.process(std::move(packet));
+		unwrapped.process(std::move(packets));
 	});
 }
 
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h
index 3ed55c5ea..434e061ee 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h
@@ -37,7 +37,7 @@ public:
 	[[nodiscard]] crl::time streamDuration() const;
 
 	// Called from the same unspecified thread.
-	void process(FFmpeg::Packet &&packet);
+	void process(std::vector<FFmpeg::Packet> &&packets);
 	void waitForData();
 
 	// Called from the main thread.