diff --git a/Telegram/SourceFiles/base/weak_ptr.h b/Telegram/SourceFiles/base/weak_ptr.h index 76dd3de83..d396501c7 100644 --- a/Telegram/SourceFiles/base/weak_ptr.h +++ b/Telegram/SourceFiles/base/weak_ptr.h @@ -57,12 +57,21 @@ public: } ~has_weak_ptr() { - if (auto alive = _alive.load()) { + if (const auto alive = _alive.load()) { alive->value.store(nullptr); details::decrement(alive); } } + friend inline void invalidate_weak_ptrs(has_weak_ptr *object) { + if (auto alive = object->_alive.load()) { + if (object->_alive.compare_exchange_strong(alive, nullptr)) { + alive->value.store(nullptr); + details::decrement(alive); + } + } + } + private: template friend class weak_ptr; diff --git a/Telegram/SourceFiles/data/data_document.cpp b/Telegram/SourceFiles/data/data_document.cpp index 92171664e..e6dcb7bf5 100644 --- a/Telegram/SourceFiles/data/data_document.cpp +++ b/Telegram/SourceFiles/data/data_document.cpp @@ -287,6 +287,90 @@ QString documentSaveFilename(const DocumentData *data, bool forceSavingAs = fals return FileNameForSave(caption, filter, prefix, name, forceSavingAs, dir); } +void StartStreaming( + not_null document, + Data::FileOrigin origin) { + AssertIsDebug(); + + using namespace Media::Streaming; + if (auto loader = document->createStreamingLoader(origin)) { + class Panel +#if defined Q_OS_MAC && !defined OS_MAC_OLD + : public Ui::RpWidgetWrap { + using Parent = Ui::RpWidgetWrap; +#else // Q_OS_MAC && !OS_MAC_OLD + : public Ui::RpWidget { + using Parent = Ui::RpWidget; +#endif // Q_OS_MAC && !OS_MAC_OLD + + public: + Panel() : Parent(nullptr) { + } + + protected: + void paintEvent(QPaintEvent *e) override { + } + + }; + + static auto player = std::unique_ptr(); + static auto video = base::unique_qptr(); + player = std::make_unique( + &document->owner(), + std::move(loader)); + video = nullptr; + document->session().lifetime().add([] { + base::take(player) = nullptr; + base::take(video) = nullptr; + }); + + player->init( + (document->isAudioFile() ? Mode::Audio : Mode::Both), + 0); + player->updates( + ) | rpl::start_with_next_error_done([=](Update &&update) { + update.data.match([&](Information &update) { + if (!update.videoCover.isNull()) { + video = base::make_unique_q(); + video->setAttribute(Qt::WA_OpaquePaintEvent); + video->paintRequest( + ) | rpl::start_with_next([=](QRect rect) { + Painter(video.get()).drawImage( + video->rect(), + player->frame(FrameRequest())); + }, video->lifetime()); + const auto size = QSize( + ConvertScale(update.videoSize.width()), + ConvertScale(update.videoSize.height())); + const auto center = App::wnd()->geometry().center(); + video->setGeometry(QRect( + center - QPoint(size.width(), size.height()) / 2, + size)); + video->show(); + video->shownValue( + ) | rpl::start_with_next([=](bool shown) { + if (!shown) { + base::take(player) = nullptr; + } + }, video->lifetime()); + } + player->start(); + }, [&](UpdateVideo &update) { + Expects(video != nullptr); + + video->update(); + }, [&](UpdateAudio &update) { + }, [&](WaitingForData &update) { + }, [&](MutedByOther &update) { + }); + }, [=](const Error &error) { + base::take(video) = nullptr; + }, [=] { + base::take(video) = nullptr; + }, player->lifetime()); + } +} + void DocumentOpenClickHandler::Open( Data::FileOrigin origin, not_null data, @@ -308,21 +392,7 @@ void DocumentOpenClickHandler::Open( } } if (data->isAudioFile() || data->isVideoFile()) { - AssertIsDebug(); - if (auto loader = data->createStreamingLoader(origin)) { - static auto player = std::unique_ptr(); - player = std::make_unique( - &data->owner(), - std::move(loader)); - data->session().lifetime().add([] { - player = nullptr; - }); - player->init( - (data->isAudioFile() - ? Media::Streaming::Mode::Audio - : Media::Streaming::Mode::Video), - 0); - } + StartStreaming(data, origin); return; } if (!location.isEmpty() || (!data->data().isEmpty() && (playVoice || playMusic || playVideo || playAnimation))) { @@ -432,21 +502,7 @@ void DocumentSaveClickHandler::Save( if (!data->date) return; if (data->isAudioFile() || data->isVideoFile()) { - AssertIsDebug(); - if (auto loader = data->createStreamingLoader(origin)) { - static auto player = std::unique_ptr(); - player = std::make_unique( - &data->owner(), - std::move(loader)); - data->session().lifetime().add([] { - player = nullptr; - }); - player->init( - (data->isAudioFile() - ? Media::Streaming::Mode::Audio - : Media::Streaming::Mode::Video), - 0); - } + StartStreaming(data, origin); return; } diff --git a/Telegram/SourceFiles/media/audio/media_audio_ffmpeg_loader.cpp b/Telegram/SourceFiles/media/audio/media_audio_ffmpeg_loader.cpp index 267e1c7d0..684a4638d 100644 --- a/Telegram/SourceFiles/media/audio/media_audio_ffmpeg_loader.cpp +++ b/Telegram/SourceFiles/media/audio/media_audio_ffmpeg_loader.cpp @@ -259,6 +259,16 @@ bool AbstractAudioFFMpegLoader::initUsingContext( return true; } +auto AbstractAudioFFMpegLoader::replaceFrameAndRead( + not_null frame, + QByteArray &result, + int64 &samplesAdded) +-> ReadResult { + av_frame_free(&_frame); + _frame = frame; + return readFromReadyFrame(result, samplesAdded); +} + auto AbstractAudioFFMpegLoader::readFromReadyContext( not_null context, QByteArray &result, diff --git a/Telegram/SourceFiles/media/audio/media_audio_ffmpeg_loader.h b/Telegram/SourceFiles/media/audio/media_audio_ffmpeg_loader.h index 7d0711508..1a6fd8da4 100644 --- a/Telegram/SourceFiles/media/audio/media_audio_ffmpeg_loader.h +++ b/Telegram/SourceFiles/media/audio/media_audio_ffmpeg_loader.h @@ -99,6 +99,13 @@ protected: QByteArray &result, int64 &samplesAdded); + // Streaming player provides the first frame to the ChildFFMpegLoader + // so we replace our allocated frame with the one provided. + ReadResult replaceFrameAndRead( + not_null frame, + QByteArray &result, + int64 &samplesAdded); + int sampleSize() const { return _outputSampleSize; } diff --git a/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.cpp b/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.cpp index 82dcc377b..0284fd711 100644 --- a/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.cpp +++ b/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.cpp @@ -27,10 +27,12 @@ bool IsPlanarFormat(int format) { } // namespace VideoSoundData::~VideoSoundData() { + if (frame) { + av_frame_free(&frame); + } if (context) { avcodec_close(context); avcodec_free_context(&context); - context = nullptr; } } @@ -49,9 +51,28 @@ bool ChildFFMpegLoader::open(crl::time positionMs) { _parentData->frequency); } +AudioPlayerLoader::ReadResult ChildFFMpegLoader::readFromInitialFrame( + QByteArray &result, + int64 &samplesAdded) { + if (!_parentData->frame) { + return ReadResult::Wait; + } + return replaceFrameAndRead( + base::take(_parentData->frame), + result, + samplesAdded); +} + AudioPlayerLoader::ReadResult ChildFFMpegLoader::readMore( QByteArray &result, int64 &samplesAdded) { + const auto initialFrameResult = readFromInitialFrame( + result, + samplesAdded); + if (initialFrameResult != ReadResult::Wait) { + return initialFrameResult; + } + const auto readResult = readFromReadyContext( _parentData->context, result, diff --git a/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h b/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h index 7858d7831..b7be2a286 100644 --- a/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h +++ b/Telegram/SourceFiles/media/audio/media_child_ffmpeg_loader.h @@ -11,6 +11,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL struct VideoSoundData { AVCodecContext *context = nullptr; + AVFrame *frame = nullptr; int32 frequency = Media::Player::kDefaultFrequency; int64 length = 0; ~VideoSoundData(); @@ -77,6 +78,13 @@ public: ~ChildFFMpegLoader(); private: + // Streaming player reads first frame by itself and provides it together + // with the codec context. So we first read data from this frame and + // only after that we try to read next packets. + ReadResult readFromInitialFrame( + QByteArray &result, + int64 &samplesAdded); + std::unique_ptr _parentData; QQueue _queue; bool _eofReached = false; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp new file mode 100644 index 000000000..6bc09f845 --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp @@ -0,0 +1,130 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#include "media/streaming/media_streaming_audio_track.h" + +#include "media/streaming/media_streaming_utility.h" +#include "media/audio/media_audio.h" +#include "media/audio/media_child_ffmpeg_loader.h" +#include "media/player/media_player_instance.h" + +namespace Media { +namespace Streaming { + +AudioTrack::AudioTrack( + Stream &&stream, + FnMut ready, + Fn error) +: _stream(std::move(stream)) +, _ready(std::move(ready)) +, _error(std::move(error)) { + Expects(_ready != nullptr); + Expects(_error != nullptr); +} + +int AudioTrack::streamIndex() const { + // Thread-safe, because _stream.index is immutable. + return _stream.index; +} + +AVRational AudioTrack::streamTimeBase() const { + return _stream.timeBase; +} + +void AudioTrack::process(Packet &&packet) { + if (_audioMsgId.playId()) { + mixerEnqueue(std::move(packet)); + } else if (!tryReadFirstFrame(std::move(packet))) { + _error(); + } +} + +bool AudioTrack::tryReadFirstFrame(Packet &&packet) { + // #TODO streaming fix seek to the end. + const auto last = packet.empty(); + if (ProcessPacket(_stream, std::move(packet)).failed()) { + return false; + } + if (const auto error = ReadNextFrame(_stream)) { + return !last && (error.code() == AVERROR(EAGAIN)); + } + if (!fillStateFromFrame()) { + return false; + } + mixerInit(); + callReady(); + return true; +} + +bool AudioTrack::fillStateFromFrame() { + _state.position = _state.receivedTill = FramePosition(_stream); + return (_state.position != kTimeUnknown); +} + +void AudioTrack::mixerInit() { + Expects(!_audioMsgId.playId()); + + _audioMsgId = AudioMsgId::ForVideo(); + + auto data = std::make_unique(); + data->context = _stream.codec.release(); + data->frequency = _stream.frequency; + data->length = (_stream.duration * data->frequency) / 1000LL; + Media::Player::mixer()->play( + _audioMsgId, + std::move(data), + _state.position); +} + +void AudioTrack::callReady() { + Expects(_ready != nullptr); + + auto data = Information(); + data.audioDuration = _stream.duration; + data.state.audio = _state; + base::take(_ready)(data); +} + +void AudioTrack::mixerEnqueue(Packet &&packet) { + Media::Player::mixer()->feedFromVideo({ + &packet.fields(), + _audioMsgId + }); + packet.release(); +} + +void AudioTrack::start() { + Expects(_ready == nullptr); + Expects(_audioMsgId.playId() != 0); + // #TODO streaming support start() when paused. + Media::Player::mixer()->resume(_audioMsgId, true); +} + +rpl::producer AudioTrack::state() { + Expects(_ready == nullptr); + + if (!_subscription) { + auto &updated = Media::Player::instance()->updatedNotifier(); + _subscription = updated.add_subscription([=]( + const Media::Player::TrackState &state) { +// _state = state; + }); + //) | rpl::filter([](const State &state) { + // return !!state.id; + //}); + } + return rpl::single(_state); +} + +AudioTrack::~AudioTrack() { + if (_audioMsgId.playId()) { + Media::Player::mixer()->stop(_audioMsgId); + } +} + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h new file mode 100644 index 000000000..ef7bfa578 --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h @@ -0,0 +1,74 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +#include "media/streaming/media_streaming_utility.h" + +namespace Media { +namespace Player { +struct TrackState; +} // namespace Player + +namespace Streaming { + +class AudioTrack final { +public: + // Called from some unspecified thread. + // Callbacks are assumed to be thread-safe. + AudioTrack( + Stream &&stream, + FnMut ready, + Fn error); + + // Called from the main thread. + // Must be called after 'ready' was invoked. + void start(); + + // Called from the main thread. + // Non-const, because we subscribe to changes on the first call. + // Must be called after 'ready' was invoked. + [[nodiscard]] rpl::producer state(); + + // Thread-safe. + [[nodiscard]] int streamIndex() const; + [[nodiscard]] AVRational streamTimeBase() const; + + // Called from the same unspecified thread. + void process(Packet &&packet); + + // Called from the main thread. + ~AudioTrack(); + +private: + // Called from the same unspecified thread. + [[nodiscard]] bool tryReadFirstFrame(Packet &&packet); + [[nodiscard]] bool fillStateFromFrame(); + void mixerInit(); + void mixerEnqueue(Packet &&packet); + void callReady(); + + // Accessed from the same unspecified thread. + Stream _stream; + + // Assumed to be thread-safe. + FnMut _ready; + const Fn _error; + + // First set from the same unspecified thread before _ready is called. + // After that is immutable. + AudioMsgId _audioMsgId; + TrackState _state; + + // Accessed from the main thread. + base::Subscription _subscription; + rpl::event_stream _stateChanges; + +}; + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_common.h b/Telegram/SourceFiles/media/streaming/media_streaming_common.h index 913611d84..aea599f04 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_common.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_common.h @@ -10,7 +10,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL namespace Media { namespace Streaming { -constexpr auto kTimeUnknown = crl::time(-1); +constexpr auto kTimeUnknown = std::numeric_limits::min(); enum class Mode { Both, @@ -19,19 +19,33 @@ enum class Mode { Inspection, }; +struct TrackState { + crl::time position = kTimeUnknown; + crl::time receivedTill = kTimeUnknown; +}; + +struct State { + TrackState video; + TrackState audio; +}; + struct Information { - crl::time videoStarted = kTimeUnknown; + State state; + crl::time videoDuration = kTimeUnknown; QSize videoSize; QImage videoCover; - int videoCoverRotation = 0; + int videoRotation = 0; - crl::time audioStarted = kTimeUnknown; crl::time audioDuration = kTimeUnknown; }; -struct RepaintRequest { - crl::time position; +struct UpdateVideo { + crl::time position = 0; +}; + +struct UpdateAudio { + crl::time position = 0; }; struct WaitingForData { @@ -43,7 +57,8 @@ struct MutedByOther { struct Update { base::variant< Information, - RepaintRequest, + UpdateVideo, + UpdateAudio, WaitingForData, MutedByOther> data; }; @@ -51,5 +66,26 @@ struct Update { struct Error { }; +struct FrameRequest { + QSize resize; + QSize outer; + ImageRoundRadius radius = ImageRoundRadius(); + RectParts corners = RectPart::AllCorners; + + bool empty() const { + return resize.isEmpty(); + } + + bool operator==(const FrameRequest &other) const { + return (resize == other.resize) + && (outer == other.outer) + && (radius == other.radius) + && (corners == other.corners); + } + bool operator!=(const FrameRequest &other) const { + return !(*this == other); + } +}; + } // namespace Streaming } // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp index d98f869c0..f716d5ddd 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp @@ -256,56 +256,6 @@ void File::Context::start(crl::time position) { // information.audio = (_audio.info != nullptr); // _information = std::move(information); //} -// -//QImage File::Context::readFirstVideoFrame() { -// auto result = QImage(); -// while (_video.info && result.isNull()) { -// auto frame = tryReadFirstVideoFrame(); -// if (unroll()) { -// return QImage(); -// } -// frame.match([&](QImage &image) { -// if (!image.isNull()) { -// result = std::move(image); -// } else { -// _video = StreamWrap(); -// } -// }, [&](const AvErrorWrap &error) { -// if (error.code() == AVERROR(EAGAIN)) { -// readNextPacket(); -// } else { -// _video = StreamWrap(); -// } -// }); -// } -// if (!_video.info && _mode == Mode::Video) { -// logFatal(qstr("RequiredStreamEmpty")); -// return QImage(); -// } -// return result; -//} -// -//base::variant File::Context::tryReadFirstVideoFrame() { -// Expects(_video.info != nullptr); -// -// if (unroll()) { -// return AvErrorWrap(); -// } -// const auto error = ReadNextFrame(_video.stream); -// if (error) { -// if (error->code() == AVERROR_EOF) { -// // No valid video stream. -// if (_mode == Mode::Video) { -// logFatal(qstr("RequiredStreamEmpty")); -// } -// return QImage(); -// } else if (error->code() != AVERROR(EAGAIN)) { -// fail(); -// } -// return *error; -// } -// return ConvertFrame(_video.stream, QSize(), QImage()); -//} void File::Context::readNextPacket() { auto result = readPacket(); @@ -377,7 +327,7 @@ void File::start(not_null delegate, crl::time position) { stop(); _context.emplace(delegate, &_reader); - _thread = std::thread([=, context = &_context.value()] { + _thread = std::thread([=, context = &*_context] { context->start(position); while (!context->finished()) { context->readNextPacket(); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp index 876931812..62c3d0973 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp @@ -9,77 +9,148 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "media/streaming/media_streaming_file.h" #include "media/streaming/media_streaming_loader.h" -#include "media/audio/media_audio.h" -#include "media/audio/media_child_ffmpeg_loader.h" +#include "media/streaming/media_streaming_audio_track.h" +#include "media/streaming/media_streaming_video_track.h" namespace Media { namespace Streaming { +namespace { -crl::time CountPacketPosition( - not_null info, - const Packet &packet) { - const auto &native = packet.fields(); - const auto packetPts = (native.pts == AV_NOPTS_VALUE) - ? native.dts - : native.pts; - const auto & timeBase = info->time_base; - return PtsToTime(packetPts, info->time_base); +void SaveValidInformation(Information &to, Information &&from) { + if (from.state.audio.position != kTimeUnknown) { + to.state.audio = from.state.audio; + } + if (from.audioDuration != kTimeUnknown) { + to.audioDuration = from.audioDuration; + } + if (from.state.video.position != kTimeUnknown) { + to.state.video = from.state.video; + } + if (from.videoDuration != kTimeUnknown) { + to.videoDuration = from.videoDuration; + } + if (!from.videoSize.isEmpty()) { + to.videoSize = from.videoSize; + } + if (!from.videoCover.isNull()) { + to.videoCover = std::move(from.videoCover); + to.videoRotation = from.videoRotation; + } } -// #TODO streaming -//void Enqueue(StreamWrap &wrap, Packet && packet) { -// const auto time = CountPacketPosition(wrap, packet); -// if (time != kTimeUnknown) { -// wrap.stream.lastReadPosition = time; -// } -// wrap.stream.queue.push_back(std::move(packet)); -//} +} // namespace Player::Player( not_null owner, std::unique_ptr loader) -: _file(std::make_unique(owner, std::move(loader))) { +: _file(std::make_unique(owner, std::move(loader))) +, _renderFrameTimer([=] { checkNextFrame(); }) { } not_null Player::delegate() { return static_cast(this); } -void Player::fileReady(Stream &&video, Stream &&audio) { - _audio = std::move(audio); - if (_audio.codec && (_mode == Mode::Audio || _mode == Mode::Both)) { - _audioMsgId = AudioMsgId::ForVideo(); +void Player::start() { + Expects(_stage == Stage::Ready); + + _stage = Stage::Started; + //if (_audio) { + // _audio->state( + // ) | rpl::start_with_next([](const TrackState & state) { + // }, _lifetime); + //} + if (_video) { + _video->renderNextFrame( + ) | rpl::start_with_next([=](crl::time when) { + _nextFrameTime = when; + checkNextFrame(); + }, _lifetime); + } + if (_audio) { + _audio->start(); + } + if (_video) { + _video->start(); + } +} + +void Player::checkNextFrame() { + Expects(_nextFrameTime != kTimeUnknown); + + const auto now = crl::now(); + if (now < _nextFrameTime) { + _renderFrameTimer.callOnce(_nextFrameTime - now); } else { - _audioMsgId = AudioMsgId(); + _renderFrameTimer.cancel(); + renderFrame(now); + } +} + +void Player::renderFrame(crl::time now) { + if (_video) { + _video->markFrameDisplayed(now); + _updates.fire({ UpdateVideo{ _nextFrameTime } }); + } +} + +void Player::fileReady(Stream &&video, Stream &&audio) { + const auto weak = base::make_weak(&_sessionGuard); + const auto ready = [=](const Information & data) { + crl::on_main(weak, [=, data = data]() mutable { + streamReady(std::move(data)); + }); + }; + const auto error = [&](auto &stream) { + return [=, &stream] { + crl::on_main(weak, [=, &stream] { + stream = nullptr; + streamFailed(); + }); + }; + }; + if (audio.codec && (_mode == Mode::Audio || _mode == Mode::Both)) { + _audio = std::make_unique( + std::move(audio), + ready, + error(_audio)); + } + if (video.codec && (_mode == Mode::Video || _mode == Mode::Both)) { + _video = std::make_unique( + std::move(video), + ready, + error(_video)); + } + if ((_mode == Mode::Audio && !_audio) + || (_mode == Mode::Video && !_video) + || (!_audio && !_video)) { + LOG(("Streaming Error: Required stream not found for mode %1." + ).arg(int(_mode))); + fileError(); } } void Player::fileError() { - + crl::on_main(&_sessionGuard, [=] { + fail(); + }); } bool Player::fileProcessPacket(Packet &&packet) { const auto &native = packet.fields(); + const auto index = native.stream_index; if (packet.empty()) { _readTillEnd = true; - } else if (native.stream_index == _audio.index) { - if (_audioMsgId.playId()) { - if (_audio.codec) { - const auto position = PtsToTime(native.pts, _audio.timeBase); - - auto data = std::make_unique(); - data->context = _audio.codec.release(); - data->frequency = _audio.frequency; - data->length = (_audio.duration * data->frequency) / 1000LL; - Media::Player::mixer()->play(_audioMsgId, std::move(data), position); - - // #TODO streaming resume when started playing - Media::Player::mixer()->resume(_audioMsgId, true); - } - Media::Player::mixer()->feedFromVideo({ &native, _audioMsgId }); - packet.release(); + if (_audio) { + _audio->process(Packet()); } - //const auto position = PtsToTime(native.pts, stream->timeBase); + if (_video) { + _video->process(Packet()); + } + } else if (_audio && _audio->streamIndex() == native.stream_index) { + _audio->process(std::move(packet)); + } else if (_video && _video->streamIndex() == native.stream_index) { + _video->process(std::move(packet)); } return fileReadMore(); } @@ -89,34 +160,98 @@ bool Player::fileReadMore() { return !_readTillEnd; } +void Player::streamReady(Information &&information) { + SaveValidInformation(_information, std::move(information)); + provideStartInformation(); +} + +void Player::streamFailed() { + if (_stage == Stage::Initializing) { + provideStartInformation(); + } else { + fail(); + } +} + +void Player::provideStartInformation() { + Expects(_stage == Stage::Initializing); + + if ((_audio && _information.audioDuration == kTimeUnknown) + || (_video && _information.videoDuration == kTimeUnknown)) { + return; // Not ready yet. + } else if ((!_audio && !_video) + || (!_audio && _mode == Mode::Audio) + || (!_video && _mode == Mode::Video)) { + fail(); + } else { + _stage = Stage::Ready; + _updates.fire(Update{ std::move(_information) }); + } +} + +void Player::fail() { + const auto stopGuarded = crl::guard(&_sessionGuard, [=] { stop(); }); + _stage = Stage::Failed; + _updates.fire_error({}); + stopGuarded(); +} + void Player::init(Mode mode, crl::time position) { stop(); _mode = mode; + _stage = Stage::Initializing; _file->start(delegate(), position); } void Player::pause() { - + _paused = true; + // #TODO streaming pause } void Player::resume() { - + _paused = false; + // #TODO streaming pause } void Player::stop() { _file->stop(); + _audio = nullptr; + _video = nullptr; + _paused = false; + invalidate_weak_ptrs(&_sessionGuard); + if (_stage != Stage::Failed) { + _stage = Stage::Uninitialized; + } _updates = rpl::event_stream(); } +bool Player::failed() const { + return (_stage == Stage::Failed); +} + bool Player::playing() const { - return false; + return (_stage == Stage::Started) && !_paused; +} + +bool Player::paused() const { + return _paused; } rpl::producer Player::updates() const { return _updates.events(); } +QImage Player::frame(const FrameRequest &request) const { + Expects(_video != nullptr); + + return _video->frame(request); +} + +rpl::lifetime &Player::lifetime() { + return _lifetime; +} + Player::~Player() = default; } // namespace Streaming diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.h b/Telegram/SourceFiles/media/streaming/media_streaming_player.h index 77c1e6f9f..32765977e 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_player.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.h @@ -9,9 +9,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "media/streaming/media_streaming_common.h" #include "media/streaming/media_streaming_file_delegate.h" - -// #TODO streaming move _audio away -#include "media/streaming/media_streaming_utility.h" +#include "base/weak_ptr.h" +#include "base/timer.h" namespace Data { class Session; @@ -22,40 +21,83 @@ namespace Streaming { class Loader; class File; +class AudioTrack; +class VideoTrack; class Player final : private FileDelegate { public: + // Public interfaces is used from the main thread. + Player(not_null owner, std::unique_ptr loader); + // Because we remember 'this' in calls to crl::on_main. + Player(const Player &other) = delete; + Player &operator=(const Player &other) = delete; + void init(Mode mode, crl::time position); + void start(); void pause(); void resume(); void stop(); - bool playing() const; + [[nodiscard]] bool failed() const; + [[nodiscard]] bool playing() const; + [[nodiscard]] bool paused() const; - rpl::producer updates() const; + [[nodiscard]] rpl::producer updates() const; + + [[nodiscard]] QImage frame(const FrameRequest &request) const; + + [[nodiscard]] rpl::lifetime &lifetime(); ~Player(); private: + enum class Stage { + Uninitialized, + Initializing, + Ready, + Started, + Failed + }; + + // Thread-safe. not_null delegate(); + // FileDelegate methods are called only from the File thread. void fileReady(Stream &&video, Stream &&audio) override; void fileError() override; - bool fileProcessPacket(Packet &&packet) override; bool fileReadMore() override; - const std::unique_ptr _file; - bool _readTillEnd = false; + // Called from the main thread. + void streamReady(Information &&information); + void streamFailed(); + void provideStartInformation(); + void fail(); + void checkNextFrame(); + void renderFrame(crl::time now); + const std::unique_ptr _file; + + // Immutable while File is active. + std::unique_ptr _audio; + std::unique_ptr _video; + base::has_weak_ptr _sessionGuard; Mode _mode = Mode::Both; - Stream _audio; - AudioMsgId _audioMsgId; + // Belongs to the File thread while File is active. + bool _readTillEnd = false; + // Belongs to the main thread. + Information _information; + Stage _stage = Stage::Uninitialized; + bool _paused = false; + + crl::time _nextFrameTime = kTimeUnknown; + base::Timer _renderFrameTimer; rpl::event_stream _updates; + rpl::lifetime _lifetime; }; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_utility.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_utility.cpp index af5057aae..0acd8b624 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_utility.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_utility.cpp @@ -110,6 +110,20 @@ SwsContextPointer MakeSwsContextPointer( not_null frame, QSize resize, SwsContextPointer *existing) { + // We have to use custom caching for SwsContext, because + // sws_getCachedContext checks passed flags with existing context flags, + // and re-creates context if they're different, but in the process of + // context creation the passed flags are modified before being written + // to the resulting context, so the caching doesn't work. + if (existing && (*existing) != nullptr) { + const auto &deleter = existing->get_deleter(); + if (deleter.resize == resize + && deleter.frameSize == QSize(frame->width, frame->height) + && deleter.frameFormat == frame->format) { + return std::move(*existing); + } + } + const auto result = sws_getCachedContext( existing ? existing->release() : nullptr, frame->width, @@ -125,7 +139,9 @@ SwsContextPointer MakeSwsContextPointer( if (!result) { LogError(qstr("sws_getCachedContext")); } - return SwsContextPointer(result); + return SwsContextPointer( + result, + { resize, QSize{ frame->width, frame->height }, frame->format }); } void SwsContextDeleter::operator()(SwsContext *value) { @@ -157,6 +173,15 @@ int64_t TimeToPts(crl::time time, AVRational timeBase) { : (time * timeBase.den) / (1000LL * timeBase.num); } +crl::time FramePosition(Stream &stream) { + const auto pts = !stream.frame + ? AV_NOPTS_VALUE + : (stream.frame->pts == AV_NOPTS_VALUE) + ? stream.frame->pkt_dts + : stream.frame->pts; + return PtsToTime(pts, stream.timeBase); +} + int ReadRotationFromMetadata(not_null stream) { const auto tag = av_dict_get(stream->metadata, "rotate", nullptr, 0); if (tag && *tag->value) { @@ -167,60 +192,66 @@ int ReadRotationFromMetadata(not_null stream) { return degrees; } } - return 90; + return 0; } bool RotationSwapWidthHeight(int rotation) { return (rotation == 90 || rotation == 270); } -std::optional ReadNextFrame(Stream &stream) { - Expects(stream.frame != nullptr); +AvErrorWrap ProcessPacket(Stream &stream, Packet &&packet) { + Expects(stream.codec != nullptr); auto error = AvErrorWrap(); - ClearFrameMemory(stream.frame.get()); - do { - error = avcodec_receive_frame(stream.codec.get(), stream.frame.get()); - if (!error) { - //processReadFrame(); // #TODO streaming - return std::nullopt; - } + const auto native = &packet.fields(); + const auto guard = gsl::finally([ + &, + size = native->size, + data = native->data + ] { + native->size = size; + native->data = data; + packet = Packet(); + }); - if (error.code() != AVERROR(EAGAIN) || stream.queue.empty()) { - return error; - } - - const auto packet = &stream.queue.front().fields(); - const auto guard = gsl::finally([ - &, - size = packet->size, - data = packet->data - ] { - packet->size = size; - packet->data = data; - stream.queue.pop_front(); - }); - - error = avcodec_send_packet( - stream.codec.get(), - packet->data ? packet : nullptr); // Drain on eof. - if (!error) { - continue; - } + error = avcodec_send_packet( + stream.codec.get(), + native->data ? native : nullptr); // Drain on eof. + if (error) { LogError(qstr("avcodec_send_packet"), error); if (error.code() == AVERROR_INVALIDDATA // There is a sample voice message where skipping such packet // results in a crash (read_access to nullptr) in swr_convert(). && stream.codec->codec_id != AV_CODEC_ID_OPUS) { if (++stream.invalidDataPackets < kSkipInvalidDataPackets) { - continue; // Try to skip a bad packet. + return AvErrorWrap(); // Try to skip a bad packet. } } - return error; - } while (true); + } + return error; +} - [[unreachable]]; +AvErrorWrap ReadNextFrame(Stream &stream) { + Expects(stream.frame != nullptr); + + auto error = AvErrorWrap(); + + do { + error = avcodec_receive_frame( + stream.codec.get(), + stream.frame.get()); + if (!error + || error.code() != AVERROR(EAGAIN) + || stream.queue.empty()) { + return error; + } + + error = ProcessPacket(stream, std::move(stream.queue.front())); + stream.queue.pop_front(); + } while (!error); + + return error; } QImage ConvertFrame( diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_utility.h b/Telegram/SourceFiles/media/streaming/media_streaming_utility.h index a1a90d540..2217f43e3 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_utility.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_utility.h @@ -25,9 +25,12 @@ public: AvErrorWrap(int code = 0) : _code(code) { } - [[nodiscard]] explicit operator bool() const { + [[nodiscard]] bool failed() const { return (_code < 0); } + [[nodiscard]] explicit operator bool() const { + return failed(); + } [[nodiscard]] int code() const { return _code; @@ -113,6 +116,10 @@ using FramePointer = std::unique_ptr; FramePointer MakeFramePointer(); struct SwsContextDeleter { + QSize resize; + QSize frameSize; + int frameFormat = int(AV_PIX_FMT_NONE); + void operator()(SwsContext *value); }; using SwsContextPointer = std::unique_ptr; @@ -128,7 +135,6 @@ struct Stream { CodecPointer codec; FramePointer frame; std::deque queue; - crl::time lastReadPosition = 0; int invalidDataPackets = 0; // Audio only. @@ -143,10 +149,12 @@ void LogError(QLatin1String method); void LogError(QLatin1String method, AvErrorWrap error); [[nodiscard]] crl::time PtsToTime(int64_t pts, AVRational timeBase); -[[nodiscard]] int64_t TimeToPts(int64_t pts, AVRational timeBase); +[[nodiscard]] int64_t TimeToPts(crl::time time, AVRational timeBase); +[[nodiscard]] crl::time FramePosition(Stream &stream); [[nodiscard]] int ReadRotationFromMetadata(not_null stream); [[nodiscard]] bool RotationSwapWidthHeight(int rotation); -[[nodiscard]] std::optional ReadNextFrame(Stream &stream); +[[nodiscard]] AvErrorWrap ProcessPacket(Stream &stream, Packet &&packet); +[[nodiscard]] AvErrorWrap ReadNextFrame(Stream &stream); [[nodiscard]] QImage ConvertFrame( Stream& stream, QSize resize, diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp new file mode 100644 index 000000000..a9f914ed9 --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp @@ -0,0 +1,465 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#include "media/streaming/media_streaming_video_track.h" + +#include "base/concurrent_timer.h" + +namespace Media { +namespace Streaming { +namespace { + +constexpr auto kDisplaySkipped = crl::time(-1); +static_assert(kDisplaySkipped != kTimeUnknown); + +} // namespace + +class VideoTrackObject final { +public: + using Frame = VideoTrack::Frame; + using Shared = VideoTrack::Shared; + + VideoTrackObject( + crl::weak_on_queue weak, + not_null shared, + Stream &&stream, + FnMut ready, + Fn error); + + void process(Packet &&packet); + + [[nodisacrd]] rpl::producer displayFrameAt() const; + + void start(); + void interrupt(); + void frameDisplayed(); + +private: + [[nodiscard]] bool interrupted() const; + [[nodiscard]] bool tryReadFirstFrame(Packet &&packet); + [[nodiscard]] bool fillStateFromFrame(); + void queueReadFrames(crl::time delay = 0); + void readFrames(); + [[nodiscard]] bool readFrame(not_null frame); + void presentFrameIfNeeded(); + void callReady(); + + [[nodiscard]] crl::time trackTime() const; + + const crl::weak_on_queue _weak; + + // Main thread wrapper destructor will set _shared back to nullptr. + // All queued method calls after that should be discarded. + Shared *_shared = nullptr; + + Stream _stream; + bool _noMoreData = false; + FnMut _ready; + Fn _error; + crl::time _startedTime = kTimeUnknown; + crl::time _startedPosition = kTimeUnknown; + rpl::variable _nextFramePosition = kTimeUnknown; + + bool _queued = false; + base::ConcurrentTimer _readFramesTimer; + +}; + +VideoTrackObject::VideoTrackObject( + crl::weak_on_queue weak, + not_null shared, + Stream &&stream, + FnMut ready, + Fn error) +: _weak(std::move(weak)) +, _shared(shared) +, _stream(std::move(stream)) +, _ready(std::move(ready)) +, _error(std::move(error)) +, _readFramesTimer(_weak, [=] { readFrames(); }) { + Expects(_ready != nullptr); + Expects(_error != nullptr); +} + +rpl::producer VideoTrackObject::displayFrameAt() const { + return _nextFramePosition.value() | rpl::map([=](crl::time position) { + return _startedTime + (position - _startedPosition); + }); +} + +void VideoTrackObject::process(Packet &&packet) { + _noMoreData = packet.empty(); + if (interrupted()) { + return; + } else if (_shared->initialized()) { + _stream.queue.push_back(std::move(packet)); + queueReadFrames(); + } else if (!tryReadFirstFrame(std::move(packet))) { + _error(); + } +} + +void VideoTrackObject::queueReadFrames(crl::time delay) { + if (delay > 0) { + _readFramesTimer.callOnce(delay); + } else if (!_queued) { + _queued = true; + _weak.with([](VideoTrackObject &that) { + that._queued = false; + that.readFrames(); + }); + } +} + +void VideoTrackObject::readFrames() { + if (interrupted()) { + return; + } + const auto state = _shared->prepareState(trackTime()); + state.match([&](Shared::PrepareFrame frame) { + if (readFrame(frame)) { + presentFrameIfNeeded(); + } + }, [&](Shared::PrepareNextCheck delay) { + Expects(delay > 0); + + queueReadFrames(delay); + }, [&](std::nullopt_t) { + presentFrameIfNeeded(); + }); +} + +bool VideoTrackObject::readFrame(not_null frame) { + if (const auto error = ReadNextFrame(_stream)) { + if (error.code() == AVERROR_EOF) { + // read till end + } else if (error.code() != AVERROR(EAGAIN) || _noMoreData) { + interrupt(); + _error(); + } + return false; + } + const auto position = FramePosition(_stream); + if (position == kTimeUnknown) { + interrupt(); + _error(); + return false; + } + frame->original = ConvertFrame( + _stream, + QSize(), + std::move(frame->original)); + frame->position = position; + frame->displayed = kTimeUnknown; + + //frame->request + //frame->prepared + + return true; +} + +void VideoTrackObject::presentFrameIfNeeded() { + const auto presented = _shared->presentFrame(trackTime()); + if (presented.position != kTimeUnknown) { + _nextFramePosition = presented.position; + } + queueReadFrames(presented.nextCheckDelay); +} + +void VideoTrackObject::start() { + _startedTime = crl::now(); + queueReadFrames(); +} + +bool VideoTrackObject::interrupted() const { + return (_shared == nullptr); +} + +void VideoTrackObject::frameDisplayed() { + queueReadFrames(); +} + +bool VideoTrackObject::tryReadFirstFrame(Packet &&packet) { + if (ProcessPacket(_stream, std::move(packet)).failed()) { + return false; + } + if (const auto error = ReadNextFrame(_stream)) { + if (error.code() == AVERROR_EOF) { + // #TODO streaming fix seek to the end. + return false; + } else if (error.code() != AVERROR(EAGAIN) || _noMoreData) { + return false; + } + return true; + } else if (!fillStateFromFrame()) { + return false; + } + auto frame = ConvertFrame(_stream, QSize(), QImage()); + if (frame.isNull()) { + return false; + } + _shared->init(std::move(frame), _startedPosition); + callReady(); + if (!_stream.queue.empty()) { + queueReadFrames(); + } + return true; +} + +bool VideoTrackObject::fillStateFromFrame() { + _startedPosition = FramePosition(_stream); + _nextFramePosition = _startedPosition; + return (_startedPosition != kTimeUnknown); +} + +void VideoTrackObject::callReady() { + Expects(_ready != nullptr); + + const auto frame = _shared->frameForPaint(); + Assert(frame != nullptr); + + auto data = Information(); + data.videoDuration = _stream.duration; + data.videoSize = frame->original.size(); + if (RotationSwapWidthHeight(_stream.rotation)) { + data.videoSize.transpose(); + } + data.videoCover = frame->original; + data.videoRotation = _stream.rotation; + data.state.video.position = _startedPosition; + base::take(_ready)(data); +} + +crl::time VideoTrackObject::trackTime() const { + return _startedPosition + + (_startedTime != kTimeUnknown ? (crl::now() - _startedTime) : 0); +} + +void VideoTrackObject::interrupt() { + _shared = nullptr; +} + +void VideoTrack::Shared::init(QImage &&cover, crl::time position) { + Expects(!initialized()); + + _frames[0].original = std::move(cover); + _frames[0].position = position; + + // Usually main thread sets displayed time before _counter increment. + // But in this case we update _counter, so we set a fake displayed time. + _frames[0].displayed = kDisplaySkipped; + + _counter.store(0, std::memory_order_release); +} + +int VideoTrack::Shared::counter() const { + return _counter.load(std::memory_order_acquire); +} + +bool VideoTrack::Shared::initialized() const { + return (counter() != kCounterUninitialized); +} + +not_null VideoTrack::Shared::getFrame(int index) { + Expects(index >= 0 && index < kFramesCount); + + return &_frames[index]; +} + +bool VideoTrack::Shared::IsPrepared(not_null frame) { + return (frame->position != kTimeUnknown) + && (frame->displayed == kTimeUnknown) + && !frame->original.isNull(); +} + +bool VideoTrack::Shared::IsStale( + not_null frame, + crl::time trackTime) { + Expects(IsPrepared(frame)); + + return (frame->position < trackTime); +} + +auto VideoTrack::Shared::prepareState(crl::time trackTime) -> PrepareState { + const auto prepareNext = [&](int index) -> PrepareState { + const auto frame = getFrame(index); + const auto next = getFrame((index + 1) % kFramesCount); + if (!IsPrepared(frame)) { + return frame; + } else if (IsStale(frame, trackTime)) { + std::swap(*frame, *next); + next->displayed = kDisplaySkipped; + return IsPrepared(frame) ? next : frame; + } else if (!IsPrepared(next)) { + return next; + } else { + return PrepareNextCheck(frame->position - trackTime + 1); + } + }; + const auto finishPrepare = [&](int index) { + const auto frame = getFrame(index); + // If player already awaits next frame - we ignore if it's stale. + return IsPrepared(frame) ? std::nullopt : PrepareState(frame); + }; + + switch (counter()) { + case 0: return finishPrepare(1); + case 1: return prepareNext(2); + case 2: return finishPrepare(2); + case 3: return prepareNext(3); + case 4: return finishPrepare(3); + case 5: return prepareNext(0); + case 6: return finishPrepare(0); + case 7: return prepareNext(1); + } + Unexpected("Counter value in VideoTrack::Shared::prepareState."); +} + +auto VideoTrack::Shared::presentFrame(crl::time trackTime) -> PresentFrame { + const auto present = [&](int counter, int index) -> PresentFrame { + const auto frame = getFrame(index); + Assert(IsPrepared(frame)); + const auto position = frame->position; + + _counter.store( + (counter + 1) % (2 * kFramesCount), + std::memory_order_release); + return { position, crl::time(0) }; + }; + const auto nextCheckDelay = [&](int index) -> PresentFrame { + const auto frame = getFrame(index); + const auto next = getFrame((index + 1) % kFramesCount); + if (!IsPrepared(frame) + || !IsPrepared(next) + || IsStale(frame, trackTime)) { + return { kTimeUnknown, crl::time(0) }; + } + return { kTimeUnknown, (trackTime - frame->position + 1) }; + }; + + switch (counter()) { + case 0: return present(0, 1); + case 1: return nextCheckDelay(2); + case 2: return present(2, 2); + case 3: return nextCheckDelay(3); + case 4: return present(4, 3); + case 5: return nextCheckDelay(0); + case 6: return present(6, 0); + case 7: return nextCheckDelay(1); + } + Unexpected("Counter value in VideoTrack::Shared::prepareState."); +} + +bool VideoTrack::Shared::markFrameDisplayed(crl::time now) { + const auto markAndJump = [&](int counter, int index) { + const auto frame = getFrame(index); + if (frame->displayed == kTimeUnknown) { + frame->displayed = now; + } + _counter.store( + (counter + 1) % (2 * kFramesCount), + std::memory_order_release); + return true; + }; + + + switch (counter()) { + case 0: return false; + case 1: return markAndJump(1, 1); + case 2: return false; + case 3: return markAndJump(3, 2); + case 4: return false; + case 5: return markAndJump(5, 3); + case 6: return false; + case 7: return markAndJump(7, 0); + } + Unexpected("Counter value in VideoTrack::Shared::markFrameDisplayed."); +} + +not_null VideoTrack::Shared::frameForPaint() { + // #TODO streaming optimize mark as displayed if possible + return getFrame(counter() / 2); +} + +VideoTrack::VideoTrack( + Stream &&stream, + FnMut ready, + Fn error) +: _streamIndex(stream.index) +, _streamTimeBase(stream.timeBase) +//, _streamRotation(stream.rotation) +, _shared(std::make_unique()) +, _wrapped( + _shared.get(), + std::move(stream), + std::move(ready), + std::move(error)) { +} + +int VideoTrack::streamIndex() const { + return _streamIndex; +} + +AVRational VideoTrack::streamTimeBase() const { + return _streamTimeBase; +} + +void VideoTrack::process(Packet &&packet) { + _wrapped.with([ + packet = std::move(packet) + ](Implementation &unwrapped) mutable { + unwrapped.process(std::move(packet)); + }); +} + +void VideoTrack::start() { + _wrapped.with([](Implementation &unwrapped) { + unwrapped.start(); + }); +} + +void VideoTrack::markFrameDisplayed(crl::time now) { + if (!_shared->markFrameDisplayed(now)) { + return; + } + _wrapped.with([](Implementation &unwrapped) { + unwrapped.frameDisplayed(); + }); +} + +QImage VideoTrack::frame(const FrameRequest &request) const { + const auto frame = _shared->frameForPaint(); + Assert(frame != nullptr); + Assert(!frame->original.isNull()); + + if (request.resize.isEmpty()) { + return frame->original; + } else if (frame->prepared.isNull() || frame->request != request) { + // #TODO streaming prepare frame + //frame->request = request; + //frame->prepared = PrepareFrame( + // frame->original, + // request, + // std::move(frame->prepared)); + } + return frame->prepared; +} + +rpl::producer VideoTrack::renderNextFrame() const { + return _wrapped.producer_on_main([](const Implementation &unwrapped) { + return unwrapped.displayFrameAt(); + }); +} + +VideoTrack::~VideoTrack() { + _wrapped.with([shared = std::move(_shared)](Implementation &unwrapped) { + unwrapped.interrupt(); + }); +} + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h new file mode 100644 index 000000000..f96dde413 --- /dev/null +++ b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h @@ -0,0 +1,107 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +#include "media/streaming/media_streaming_utility.h" + +#include + +namespace Media { +namespace Streaming { + +class VideoTrackObject; + +class VideoTrack final { +public: + // Called from some unspecified thread. + // Callbacks are assumed to be thread-safe. + VideoTrack( + Stream &&stream, + FnMut ready, + Fn error); + + // Thread-safe. + [[nodiscard]] int streamIndex() const; + [[nodiscard]] AVRational streamTimeBase() const; + + // Called from the same unspecified thread. + void process(Packet &&packet); + + // Called from the main thread. + void start(); + void markFrameDisplayed(crl::time now); + [[nodiscard]] QImage frame(const FrameRequest &request) const; + [[nodiscard]] rpl::producer renderNextFrame() const; + + // Called from the main thread. + ~VideoTrack(); + +private: + friend class VideoTrackObject; + + struct Frame { + QImage original; + crl::time position = kTimeUnknown; + //crl::time presentation = kTimeUnknown; + crl::time displayed = kTimeUnknown; + + FrameRequest request; + QImage prepared; + }; + + class Shared { + public: + using PrepareFrame = not_null; + using PrepareNextCheck = crl::time; + using PrepareState = base::optional_variant< + PrepareFrame, + PrepareNextCheck>; + struct PresentFrame { + crl::time position = kTimeUnknown; + crl::time nextCheckDelay = 0; + }; + + // Called from the wrapped object queue. + void init(QImage &&cover, crl::time position); + [[nodiscard]] bool initialized() const; + + [[nodiscard]] PrepareState prepareState(crl::time trackTime); + [[nodiscard]] PresentFrame presentFrame(crl::time trackTime); + + // Called from the main thread. + [[nodiscard]] bool markFrameDisplayed(crl::time now); + [[nodiscard]] not_null frameForPaint(); + + private: + [[nodiscard]] not_null getFrame(int index); + [[nodiscard]] static bool IsPrepared(not_null frame); + [[nodiscard]] static bool IsStale( + not_null frame, + crl::time trackTime); + [[nodiscard]] int counter() const; + + static constexpr auto kCounterUninitialized = -1; + std::atomic _counter = kCounterUninitialized; + + static constexpr auto kFramesCount = 4; + std::array _frames; + + }; + + const int _streamIndex = 0; + const AVRational _streamTimeBase; + //const int _streamRotation = 0; + std::unique_ptr _shared; + + using Implementation = VideoTrackObject; + crl::object_on_queue _wrapped; + +}; + +} // namespace Streaming +} // namespace Media diff --git a/Telegram/SourceFiles/rpl/range.h b/Telegram/SourceFiles/rpl/range.h index d37b5afca..790e168b0 100644 --- a/Telegram/SourceFiles/rpl/range.h +++ b/Telegram/SourceFiles/rpl/range.h @@ -12,9 +12,9 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL namespace rpl { -template +template inline auto single(Value &&value) { - return make_producer>([ + return make_producer, Error>([ value = std::forward(value) ](const auto &consumer) mutable { consumer.put_next(std::move(value)); @@ -23,8 +23,9 @@ inline auto single(Value &&value) { }); } +template inline auto single() { - return make_producer<>([](const auto &consumer) { + return make_producer([](const auto &consumer) { consumer.put_next({}); consumer.put_done(); return lifetime(); diff --git a/Telegram/gyp/telegram_sources.txt b/Telegram/gyp/telegram_sources.txt index a16679957..457882f06 100644 --- a/Telegram/gyp/telegram_sources.txt +++ b/Telegram/gyp/telegram_sources.txt @@ -454,6 +454,8 @@ <(src_loc)/media/player/media_player_volume_controller.h <(src_loc)/media/player/media_player_widget.cpp <(src_loc)/media/player/media_player_widget.h +<(src_loc)/media/streaming/media_streaming_audio_track.cpp +<(src_loc)/media/streaming/media_streaming_audio_track.h <(src_loc)/media/streaming/media_streaming_common.h <(src_loc)/media/streaming/media_streaming_file.cpp <(src_loc)/media/streaming/media_streaming_file.h @@ -468,6 +470,8 @@ <(src_loc)/media/streaming/media_streaming_reader.h <(src_loc)/media/streaming/media_streaming_utility.cpp <(src_loc)/media/streaming/media_streaming_utility.h +<(src_loc)/media/streaming/media_streaming_video_track.cpp +<(src_loc)/media/streaming/media_streaming_video_track.h <(src_loc)/media/view/media_clip_controller.cpp <(src_loc)/media/view/media_clip_controller.h <(src_loc)/media/view/media_clip_playback.cpp