From 26ea6c4e63d37842013ab02e4abdee1665125f0c Mon Sep 17 00:00:00 2001 From: John Preston Date: Wed, 20 Feb 2019 17:28:48 +0400 Subject: [PATCH] Provide receivedTill for streamed tracks. --- Telegram/SourceFiles/data/data_document.cpp | 8 +- .../streaming/media_streaming_audio_track.cpp | 73 +++++--- .../streaming/media_streaming_audio_track.h | 10 +- .../media/streaming/media_streaming_common.h | 43 +++-- .../media/streaming/media_streaming_file.cpp | 64 +++---- .../media/streaming/media_streaming_file.h | 1 + .../streaming/media_streaming_player.cpp | 162 ++++++++++++++---- .../media/streaming/media_streaming_player.h | 20 ++- .../streaming/media_streaming_utility.cpp | 15 +- .../media/streaming/media_streaming_utility.h | 7 +- .../streaming/media_streaming_video_track.cpp | 96 +++++++---- .../streaming/media_streaming_video_track.h | 10 +- Telegram/SourceFiles/rpl/event_stream.h | 4 +- Telegram/SourceFiles/rpl/variable.h | 28 ++- 14 files changed, 377 insertions(+), 164 deletions(-) diff --git a/Telegram/SourceFiles/data/data_document.cpp b/Telegram/SourceFiles/data/data_document.cpp index e6dcb7bf5..8059acac9 100644 --- a/Telegram/SourceFiles/data/data_document.cpp +++ b/Telegram/SourceFiles/data/data_document.cpp @@ -330,7 +330,7 @@ void StartStreaming( player->updates( ) | rpl::start_with_next_error_done([=](Update &&update) { update.data.match([&](Information &update) { - if (!update.videoCover.isNull()) { + if (!update.video.cover.isNull()) { video = base::make_unique_q(); video->setAttribute(Qt::WA_OpaquePaintEvent); video->paintRequest( @@ -340,8 +340,8 @@ void StartStreaming( player->frame(FrameRequest())); }, video->lifetime()); const auto size = QSize( - ConvertScale(update.videoSize.width()), - ConvertScale(update.videoSize.height())); + ConvertScale(update.video.size.width()), + ConvertScale(update.video.size.height())); const auto center = App::wnd()->geometry().center(); video->setGeometry(QRect( center - QPoint(size.width(), size.height()) / 2, @@ -355,10 +355,12 @@ void StartStreaming( }, video->lifetime()); } player->start(); + }, [&](PreloadedVideo &update) { }, [&](UpdateVideo &update) { Expects(video != nullptr); video->update(); + }, [&](PreloadedAudio &update) { }, [&](UpdateAudio &update) { }, [&](WaitingForData &update) { }, [&](MutedByOther &update) { diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp index 6bc09f845..960c37a90 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.cpp @@ -36,6 +36,7 @@ AVRational AudioTrack::streamTimeBase() const { } void AudioTrack::process(Packet &&packet) { + _noMoreData = packet.empty(); if (_audioMsgId.playId()) { mixerEnqueue(std::move(packet)); } else if (!tryReadFirstFrame(std::move(packet))) { @@ -45,14 +46,18 @@ void AudioTrack::process(Packet &&packet) { bool AudioTrack::tryReadFirstFrame(Packet &&packet) { // #TODO streaming fix seek to the end. - const auto last = packet.empty(); if (ProcessPacket(_stream, std::move(packet)).failed()) { return false; } if (const auto error = ReadNextFrame(_stream)) { - return !last && (error.code() == AVERROR(EAGAIN)); - } - if (!fillStateFromFrame()) { + if (error.code() == AVERROR_EOF) { + // #TODO streaming fix seek to the end. + return false; + } else if (error.code() != AVERROR(EAGAIN) || _noMoreData) { + return false; + } + return true; + } else if (!fillStateFromFrame()) { return false; } mixerInit(); @@ -61,8 +66,8 @@ bool AudioTrack::tryReadFirstFrame(Packet &&packet) { } bool AudioTrack::fillStateFromFrame() { - _state.position = _state.receivedTill = FramePosition(_stream); - return (_state.position != kTimeUnknown); + _startedPosition = FramePosition(_stream); + return (_startedPosition != kTimeUnknown); } void AudioTrack::mixerInit() { @@ -77,16 +82,19 @@ void AudioTrack::mixerInit() { Media::Player::mixer()->play( _audioMsgId, std::move(data), - _state.position); + _startedPosition); } void AudioTrack::callReady() { Expects(_ready != nullptr); - auto data = Information(); - data.audioDuration = _stream.duration; - data.state.audio = _state; - base::take(_ready)(data); + auto data = AudioInformation(); + data.state.duration = _stream.duration; + data.state.position = _startedPosition; + data.state.receivedTill = _noMoreData + ? _stream.duration + : _startedPosition; + base::take(_ready)({ VideoInformation(), data }); } void AudioTrack::mixerEnqueue(Packet &&packet) { @@ -100,24 +108,49 @@ void AudioTrack::mixerEnqueue(Packet &&packet) { void AudioTrack::start() { Expects(_ready == nullptr); Expects(_audioMsgId.playId() != 0); + // #TODO streaming support start() when paused. Media::Player::mixer()->resume(_audioMsgId, true); } -rpl::producer AudioTrack::state() { +rpl::producer AudioTrack::playPosition() { Expects(_ready == nullptr); if (!_subscription) { - auto &updated = Media::Player::instance()->updatedNotifier(); - _subscription = updated.add_subscription([=]( - const Media::Player::TrackState &state) { -// _state = state; + _subscription = Media::Player::Updated( + ).add_subscription([=](const AudioMsgId &id) { + using State = Media::Player::State; + if (id != _audioMsgId) { + return; + } + const auto type = AudioMsgId::Type::Video; + const auto state = Media::Player::mixer()->currentState(type); + if (state.id != _audioMsgId) { + // #TODO streaming muted by other + return; + } else switch (state.state) { + case State::Stopped: + case State::StoppedAtEnd: + case State::PausedAtEnd: + _playPosition.reset(); + return; + case State::StoppedAtError: + case State::StoppedAtStart: + _error(); + return; + case State::Starting: + case State::Playing: + case State::Stopping: + case State::Pausing: + case State::Resuming: + _playPosition = state.position * 1000 / state.frequency; + return; + case State::Paused: + return; + } }); - //) | rpl::filter([](const State &state) { - // return !!state.id; - //}); } - return rpl::single(_state); + return _playPosition.value(); } AudioTrack::~AudioTrack() { diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h index ef7bfa578..267312bca 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_audio_track.h @@ -10,9 +10,6 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "media/streaming/media_streaming_utility.h" namespace Media { -namespace Player { -struct TrackState; -} // namespace Player namespace Streaming { @@ -32,7 +29,7 @@ public: // Called from the main thread. // Non-const, because we subscribe to changes on the first call. // Must be called after 'ready' was invoked. - [[nodiscard]] rpl::producer state(); + [[nodiscard]] rpl::producer playPosition(); // Thread-safe. [[nodiscard]] int streamIndex() const; @@ -54,6 +51,7 @@ private: // Accessed from the same unspecified thread. Stream _stream; + bool _noMoreData = false; // Assumed to be thread-safe. FnMut _ready; @@ -62,11 +60,11 @@ private: // First set from the same unspecified thread before _ready is called. // After that is immutable. AudioMsgId _audioMsgId; - TrackState _state; + crl::time _startedPosition = kTimeUnknown; // Accessed from the main thread. base::Subscription _subscription; - rpl::event_stream _stateChanges; + rpl::variable _playPosition; }; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_common.h b/Telegram/SourceFiles/media/streaming/media_streaming_common.h index aea599f04..d8ceeab19 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_common.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_common.h @@ -12,6 +12,9 @@ namespace Streaming { constexpr auto kTimeUnknown = std::numeric_limits::min(); +class VideoTrack; +class AudioTrack; + enum class Mode { Both, Audio, @@ -22,32 +25,40 @@ enum class Mode { struct TrackState { crl::time position = kTimeUnknown; crl::time receivedTill = kTimeUnknown; + crl::time duration = kTimeUnknown; }; -struct State { - TrackState video; - TrackState audio; +struct VideoInformation { + TrackState state; + QSize size; + QImage cover; + int rotation = 0; +}; + +struct AudioInformation { + TrackState state; }; struct Information { - State state; - - crl::time videoDuration = kTimeUnknown; - QSize videoSize; - QImage videoCover; - int videoRotation = 0; - - crl::time audioDuration = kTimeUnknown; + VideoInformation video; + AudioInformation audio; }; -struct UpdateVideo { - crl::time position = 0; +template +struct PreloadedUpdate { + crl::time till = kTimeUnknown; }; -struct UpdateAudio { - crl::time position = 0; +template +struct PlaybackUpdate { + crl::time position = kTimeUnknown; }; +using PreloadedVideo = PreloadedUpdate; +using UpdateVideo = PlaybackUpdate; +using PreloadedAudio = PreloadedUpdate; +using UpdateAudio = PlaybackUpdate; + struct WaitingForData { }; @@ -57,7 +68,9 @@ struct MutedByOther { struct Update { base::variant< Information, + PreloadedVideo, UpdateVideo, + PreloadedAudio, UpdateAudio, WaitingForData, MutedByOther> data; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp index f716d5ddd..94ddcfd94 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp @@ -128,8 +128,11 @@ Stream File::Context::initStream(AVMediaType type) { } result.timeBase = info->time_base; result.duration = (info->duration != AV_NOPTS_VALUE) - ? PtsToTime(info->duration, result.timeBase) - : PtsToTime(_formatContext->duration, kUniversalTimeBase); + ? PtsToTimeCeil(info->duration, result.timeBase) + : PtsToTimeCeil(_formatContext->duration, kUniversalTimeBase); + if (result.duration == kTimeUnknown || !result.duration) { + return {}; + } return result; } @@ -217,52 +220,17 @@ void File::Context::start(crl::time position) { _delegate->fileReady(std::move(video), std::move(audio)); } -//void File::Context::readInformation(crl::time position) { -// auto information = Information(); -// auto result = readPacket(); -// const auto packet = base::get_if(&result); -// if (unroll()) { -// return; -// } else if (packet) { -// if (position > 0) { -// const auto time = CountPacketPosition( -// _formatContext->streams[packet->fields().stream_index], -// *packet); -// information.started = (time == Information::kDurationUnknown) -// ? position -// : time; -// } -// } else { -// information.started = position; -// } -// -// if (packet) { -// processPacket(std::move(*packet)); -// } else { -// enqueueEofPackets(); -// } -// -// information.cover = readFirstVideoFrame(); -// if (unroll()) { -// return; -// } else if (!information.cover.isNull()) { -// information.video = information.cover.size(); -// information.rotation = _video.stream.rotation; -// if (RotationSwapWidthHeight(information.rotation)) { -// information.video.transpose(); -// } -// } -// -// information.audio = (_audio.info != nullptr); -// _information = std::move(information); -//} - void File::Context::readNextPacket() { auto result = readPacket(); if (unroll()) { return; } else if (const auto packet = base::get_if(&result)) { const auto more = _delegate->fileProcessPacket(std::move(*packet)); + if (!more) { + do { + _semaphore.acquire(); + } while (!unroll() && !_delegate->fileReadMore()); + } } else { // Still trying to read by drain. Assert(result.is()); @@ -271,8 +239,8 @@ void File::Context::readNextPacket() { } } void File::Context::handleEndOfFile() { - // #TODO streaming looping const auto more = _delegate->fileProcessPacket(Packet()); + // #TODO streaming looping _readTillEnd = true; } @@ -281,6 +249,10 @@ void File::Context::interrupt() { _semaphore.release(); } +void File::Context::wake() { + _semaphore.release(); +} + bool File::Context::interrupted() const { return _interrupted; } @@ -339,6 +311,12 @@ void File::start(not_null delegate, crl::time position) { }); } +void File::wake() { + Expects(_context.has_value()); + + _context->wake(); +} + void File::stop() { if (_thread.joinable()) { _context->interrupt(); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.h b/Telegram/SourceFiles/media/streaming/media_streaming_file.h index c75cc8dc8..5a61a6318 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_file.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.h @@ -47,6 +47,7 @@ private: void readNextPacket(); void interrupt(); + void wake(); [[nodiscard]] bool interrupted() const; [[nodiscard]] bool failed() const; [[nodiscard]] bool finished() const; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp index 62c3d0973..f9a978750 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp @@ -16,25 +16,45 @@ namespace Media { namespace Streaming { namespace { -void SaveValidInformation(Information &to, Information &&from) { - if (from.state.audio.position != kTimeUnknown) { - to.state.audio = from.state.audio; +void SaveValidStateInformation(TrackState &to, TrackState &&from) { + Expects(from.position != kTimeUnknown); + Expects(from.receivedTill != kTimeUnknown); + Expects(from.duration != kTimeUnknown); + + to.duration = from.duration; + to.position = from.position; + to.receivedTill = (to.receivedTill == kTimeUnknown) + ? from.receivedTill + : std::clamp( + std::max(from.receivedTill, to.receivedTill), + to.position, + to.duration); +} + +void SaveValidAudioInformation( + AudioInformation &to, + AudioInformation &&from) { + SaveValidStateInformation(to.state, std::move(from.state)); +} + +void SaveValidVideoInformation( + VideoInformation &to, + VideoInformation &&from) { + Expects(!from.size.isEmpty()); + Expects(!from.cover.isNull()); + + SaveValidStateInformation(to.state, std::move(from.state)); + to.size = from.size; + to.cover = std::move(from.cover); + to.rotation = from.rotation; +} + +void SaveValidStartInformation(Information &to, Information &&from) { + if (from.audio.state.duration != kTimeUnknown) { + SaveValidAudioInformation(to.audio, std::move(from.audio)); } - if (from.audioDuration != kTimeUnknown) { - to.audioDuration = from.audioDuration; - } - if (from.state.video.position != kTimeUnknown) { - to.state.video = from.state.video; - } - if (from.videoDuration != kTimeUnknown) { - to.videoDuration = from.videoDuration; - } - if (!from.videoSize.isEmpty()) { - to.videoSize = from.videoSize; - } - if (!from.videoCover.isNull()) { - to.videoCover = std::move(from.videoCover); - to.videoRotation = from.videoRotation; + if (from.video.state.duration != kTimeUnknown) { + SaveValidVideoInformation(to.video, std::move(from.video)); } } @@ -55,16 +75,21 @@ void Player::start() { Expects(_stage == Stage::Ready); _stage = Stage::Started; - //if (_audio) { - // _audio->state( - // ) | rpl::start_with_next([](const TrackState & state) { - // }, _lifetime); - //} + if (_audio) { + _audio->playPosition( + ) | rpl::start_with_next_done([=](crl::time position) { + audioPlayedTill(position); + }, [=] { + // audio finished + }, _lifetime); + } if (_video) { _video->renderNextFrame( - ) | rpl::start_with_next([=](crl::time when) { + ) | rpl::start_with_next_done([=](crl::time when) { _nextFrameTime = when; checkNextFrame(); + }, [=] { + // video finished }, _lifetime); } if (_audio) { @@ -89,11 +114,69 @@ void Player::checkNextFrame() { void Player::renderFrame(crl::time now) { if (_video) { - _video->markFrameDisplayed(now); - _updates.fire({ UpdateVideo{ _nextFrameTime } }); + const auto position = _video->markFrameDisplayed(now); + if (position != kTimeUnknown) { + videoPlayedTill(position); + } } } +template +void Player::trackReceivedTill( + const Track &track, + TrackState &state, + crl::time position) { + if (position == kTimeUnknown) { + return; + } else if (state.duration != kTimeUnknown) { + position = std::clamp(position, 0LL, state.duration); + if (state.receivedTill < position) { + state.receivedTill = position; + _updates.fire({ PreloadedUpdate{ position } }); + } + } else { + state.receivedTill = position; + } +} + +template +void Player::trackPlayedTill( + const Track &track, + TrackState &state, + crl::time position) { + const auto guard = base::make_weak(&_sessionGuard); + trackReceivedTill(track, state, position); + if (guard && position != kTimeUnknown) { + position = std::clamp(position, 0LL, state.duration); + state.position = position; + _updates.fire({ PlaybackUpdate{ position } }); + } +} + +void Player::audioReceivedTill(crl::time position) { + Expects(_audio != nullptr); + + trackReceivedTill(*_audio, _information.audio.state, position); +} + +void Player::audioPlayedTill(crl::time position) { + Expects(_audio != nullptr); + + trackPlayedTill(*_audio, _information.audio.state, position); +} + +void Player::videoReceivedTill(crl::time position) { + Expects(_video != nullptr); + + trackReceivedTill(*_video, _information.video.state, position); +} + +void Player::videoPlayedTill(crl::time position) { + Expects(_video != nullptr); + + trackPlayedTill(*_video, _information.video.state, position); +} + void Player::fileReady(Stream &&video, Stream &&audio) { const auto weak = base::make_weak(&_sessionGuard); const auto ready = [=](const Information & data) { @@ -142,14 +225,28 @@ bool Player::fileProcessPacket(Packet &&packet) { if (packet.empty()) { _readTillEnd = true; if (_audio) { + crl::on_main(&_sessionGuard, [=] { + audioReceivedTill(kReceivedTillEnd); + }); _audio->process(Packet()); } if (_video) { + crl::on_main(&_sessionGuard, [=] { + videoReceivedTill(kReceivedTillEnd); + }); _video->process(Packet()); } } else if (_audio && _audio->streamIndex() == native.stream_index) { + const auto time = PacketPosition(packet, _audio->streamTimeBase()); + crl::on_main(&_sessionGuard, [=] { + audioReceivedTill(time); + }); _audio->process(std::move(packet)); } else if (_video && _video->streamIndex() == native.stream_index) { + const auto time = PacketPosition(packet, _video->streamTimeBase()); + crl::on_main(&_sessionGuard, [=] { + videoReceivedTill(time); + }); _video->process(std::move(packet)); } return fileReadMore(); @@ -161,7 +258,7 @@ bool Player::fileReadMore() { } void Player::streamReady(Information &&information) { - SaveValidInformation(_information, std::move(information)); + SaveValidStartInformation(_information, std::move(information)); provideStartInformation(); } @@ -176,8 +273,8 @@ void Player::streamFailed() { void Player::provideStartInformation() { Expects(_stage == Stage::Initializing); - if ((_audio && _information.audioDuration == kTimeUnknown) - || (_video && _information.videoDuration == kTimeUnknown)) { + if ((_audio && _information.audio.state.duration == kTimeUnknown) + || (_video && _information.video.state.duration == kTimeUnknown)) { return; // Not ready yet. } else if ((!_audio && !_video) || (!_audio && _mode == Mode::Audio) @@ -185,7 +282,12 @@ void Player::provideStartInformation() { fail(); } else { _stage = Stage::Ready; - _updates.fire(Update{ std::move(_information) }); + + // Don't keep the reference to the video cover. + auto copy = _information; + _information.video.cover = QImage(); + + _updates.fire(Update{ std::move(copy) }); } } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.h b/Telegram/SourceFiles/media/streaming/media_streaming_player.h index 32765977e..612b01f7b 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_player.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.h @@ -27,7 +27,6 @@ class VideoTrack; class Player final : private FileDelegate { public: // Public interfaces is used from the main thread. - Player(not_null owner, std::unique_ptr loader); // Because we remember 'this' in calls to crl::on_main. @@ -77,9 +76,28 @@ private: void fail(); void checkNextFrame(); void renderFrame(crl::time now); + void audioReceivedTill(crl::time position); + void audioPlayedTill(crl::time position); + void videoReceivedTill(crl::time position); + void videoPlayedTill(crl::time position); + + template + void trackReceivedTill( + const Track &track, + TrackState &state, + crl::time position); + + template + void trackPlayedTill( + const Track &track, + TrackState &state, + crl::time position); const std::unique_ptr _file; + static constexpr auto kReceivedTillEnd + = std::numeric_limits::max(); + // Immutable while File is active. std::unique_ptr _audio; std::unique_ptr _video; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_utility.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_utility.cpp index 0acd8b624..56cc36c40 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_utility.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_utility.cpp @@ -167,13 +167,26 @@ crl::time PtsToTime(int64_t pts, AVRational timeBase) { : ((pts * 1000LL * timeBase.num) / timeBase.den); } +crl::time PtsToTimeCeil(int64_t pts, AVRational timeBase) { + return (pts == AV_NOPTS_VALUE || !timeBase.den) + ? kTimeUnknown + : ((pts * 1000LL * timeBase.num + timeBase.den - 1) / timeBase.den); +} + int64_t TimeToPts(crl::time time, AVRational timeBase) { return (time == kTimeUnknown || !timeBase.num) ? AV_NOPTS_VALUE : (time * timeBase.den) / (1000LL * timeBase.num); } -crl::time FramePosition(Stream &stream) { +crl::time PacketPosition(const Packet &packet, AVRational timeBase) { + const auto &native = packet.fields(); + return PtsToTime( + (native.pts == AV_NOPTS_VALUE) ? native.dts : native.pts, + timeBase); +} + +crl::time FramePosition(const Stream &stream) { const auto pts = !stream.frame ? AV_NOPTS_VALUE : (stream.frame->pts == AV_NOPTS_VALUE) diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_utility.h b/Telegram/SourceFiles/media/streaming/media_streaming_utility.h index 2217f43e3..37747282c 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_utility.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_utility.h @@ -149,8 +149,13 @@ void LogError(QLatin1String method); void LogError(QLatin1String method, AvErrorWrap error); [[nodiscard]] crl::time PtsToTime(int64_t pts, AVRational timeBase); +// Used for full duration conversion. +[[nodiscard]] crl::time PtsToTimeCeil(int64_t pts, AVRational timeBase); [[nodiscard]] int64_t TimeToPts(crl::time time, AVRational timeBase); -[[nodiscard]] crl::time FramePosition(Stream &stream); +[[nodiscard]] crl::time PacketPosition( + const Packet &packet, + AVRational timeBase); +[[nodiscard]] crl::time FramePosition(const Stream &stream); [[nodiscard]] int ReadRotationFromMetadata(not_null stream); [[nodiscard]] bool RotationSwapWidthHeight(int rotation); [[nodiscard]] AvErrorWrap ProcessPacket(Stream &stream, Packet &&packet); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp index a9f914ed9..c454f847f 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.cpp @@ -48,6 +48,9 @@ private: void presentFrameIfNeeded(); void callReady(); + // Force frame position to be clamped to [0, duration] and monotonic. + [[nodiscard]] crl::time currentFramePosition() const; + [[nodiscard]] crl::time trackTime() const; const crl::weak_on_queue _weak; @@ -62,7 +65,8 @@ private: Fn _error; crl::time _startedTime = kTimeUnknown; crl::time _startedPosition = kTimeUnknown; - rpl::variable _nextFramePosition = kTimeUnknown; + mutable crl::time _previousFramePosition = kTimeUnknown; + rpl::variable _nextFrameDisplayPosition = kTimeUnknown; bool _queued = false; base::ConcurrentTimer _readFramesTimer; @@ -86,8 +90,9 @@ VideoTrackObject::VideoTrackObject( } rpl::producer VideoTrackObject::displayFrameAt() const { - return _nextFramePosition.value() | rpl::map([=](crl::time position) { - return _startedTime + (position - _startedPosition); + return _nextFrameDisplayPosition.value( + ) | rpl::map([=](crl::time displayPosition) { + return _startedTime + (displayPosition - _startedPosition); }); } @@ -143,7 +148,7 @@ bool VideoTrackObject::readFrame(not_null frame) { } return false; } - const auto position = FramePosition(_stream); + const auto position = currentFramePosition(); if (position == kTimeUnknown) { interrupt(); _error(); @@ -154,6 +159,7 @@ bool VideoTrackObject::readFrame(not_null frame) { QSize(), std::move(frame->original)); frame->position = position; + frame->displayPosition = position; // #TODO streaming adjust / sync frame->displayed = kTimeUnknown; //frame->request @@ -164,8 +170,8 @@ bool VideoTrackObject::readFrame(not_null frame) { void VideoTrackObject::presentFrameIfNeeded() { const auto presented = _shared->presentFrame(trackTime()); - if (presented.position != kTimeUnknown) { - _nextFramePosition = presented.position; + if (presented.displayPosition != kTimeUnknown) { + _nextFrameDisplayPosition = presented.displayPosition; } queueReadFrames(presented.nextCheckDelay); } @@ -210,9 +216,21 @@ bool VideoTrackObject::tryReadFirstFrame(Packet &&packet) { return true; } +crl::time VideoTrackObject::currentFramePosition() const { + const auto position = std::min( + FramePosition(_stream), + _stream.duration); + if (_previousFramePosition != kTimeUnknown + && position <= _previousFramePosition) { + return kTimeUnknown; + } + _previousFramePosition = position; + return position; +} + bool VideoTrackObject::fillStateFromFrame() { - _startedPosition = FramePosition(_stream); - _nextFramePosition = _startedPosition; + _startedPosition = currentFramePosition(); + _nextFrameDisplayPosition = _startedPosition; return (_startedPosition != kTimeUnknown); } @@ -222,16 +240,19 @@ void VideoTrackObject::callReady() { const auto frame = _shared->frameForPaint(); Assert(frame != nullptr); - auto data = Information(); - data.videoDuration = _stream.duration; - data.videoSize = frame->original.size(); + auto data = VideoInformation(); + data.size = frame->original.size(); if (RotationSwapWidthHeight(_stream.rotation)) { - data.videoSize.transpose(); + data.size.transpose(); } - data.videoCover = frame->original; - data.videoRotation = _stream.rotation; - data.state.video.position = _startedPosition; - base::take(_ready)(data); + data.cover = frame->original; + data.rotation = _stream.rotation; + data.state.duration = _stream.duration; + data.state.position = _startedPosition; + data.state.receivedTill = _noMoreData + ? _stream.duration + : _startedPosition; + base::take(_ready)({ data }); } crl::time VideoTrackObject::trackTime() const { @@ -248,6 +269,7 @@ void VideoTrack::Shared::init(QImage &&cover, crl::time position) { _frames[0].original = std::move(cover); _frames[0].position = position; + _frames[0].displayPosition = position; // Usually main thread sets displayed time before _counter increment. // But in this case we update _counter, so we set a fake displayed time. @@ -271,7 +293,7 @@ not_null VideoTrack::Shared::getFrame(int index) { } bool VideoTrack::Shared::IsPrepared(not_null frame) { - return (frame->position != kTimeUnknown) + return (frame->displayPosition != kTimeUnknown) && (frame->displayed == kTimeUnknown) && !frame->original.isNull(); } @@ -281,7 +303,7 @@ bool VideoTrack::Shared::IsStale( crl::time trackTime) { Expects(IsPrepared(frame)); - return (frame->position < trackTime); + return (frame->displayPosition < trackTime); } auto VideoTrack::Shared::prepareState(crl::time trackTime) -> PrepareState { @@ -297,7 +319,7 @@ auto VideoTrack::Shared::prepareState(crl::time trackTime) -> PrepareState { } else if (!IsPrepared(next)) { return next; } else { - return PrepareNextCheck(frame->position - trackTime + 1); + return PrepareNextCheck(frame->displayPosition - trackTime + 1); } }; const auto finishPrepare = [&](int index) { @@ -323,8 +345,9 @@ auto VideoTrack::Shared::presentFrame(crl::time trackTime) -> PresentFrame { const auto present = [&](int counter, int index) -> PresentFrame { const auto frame = getFrame(index); Assert(IsPrepared(frame)); - const auto position = frame->position; + const auto position = frame->displayPosition; + // Release this frame to the main thread for rendering. _counter.store( (counter + 1) % (2 * kFramesCount), std::memory_order_release); @@ -338,7 +361,7 @@ auto VideoTrack::Shared::presentFrame(crl::time trackTime) -> PresentFrame { || IsStale(frame, trackTime)) { return { kTimeUnknown, crl::time(0) }; } - return { kTimeUnknown, (trackTime - frame->position + 1) }; + return { kTimeUnknown, (trackTime - frame->displayPosition + 1) }; }; switch (counter()) { @@ -354,27 +377,27 @@ auto VideoTrack::Shared::presentFrame(crl::time trackTime) -> PresentFrame { Unexpected("Counter value in VideoTrack::Shared::prepareState."); } -bool VideoTrack::Shared::markFrameDisplayed(crl::time now) { +crl::time VideoTrack::Shared::markFrameDisplayed(crl::time now) { const auto markAndJump = [&](int counter, int index) { const auto frame = getFrame(index); - if (frame->displayed == kTimeUnknown) { - frame->displayed = now; - } + Assert(frame->displayed == kTimeUnknown); + + frame->displayed = now; _counter.store( (counter + 1) % (2 * kFramesCount), std::memory_order_release); - return true; + return frame->position; }; switch (counter()) { - case 0: return false; + case 0: return kTimeUnknown; case 1: return markAndJump(1, 1); - case 2: return false; + case 2: return kTimeUnknown; case 3: return markAndJump(3, 2); - case 4: return false; + case 4: return kTimeUnknown; case 5: return markAndJump(5, 3); - case 6: return false; + case 6: return kTimeUnknown; case 7: return markAndJump(7, 0); } Unexpected("Counter value in VideoTrack::Shared::markFrameDisplayed."); @@ -422,13 +445,14 @@ void VideoTrack::start() { }); } -void VideoTrack::markFrameDisplayed(crl::time now) { - if (!_shared->markFrameDisplayed(now)) { - return; +crl::time VideoTrack::markFrameDisplayed(crl::time now) { + const auto position = _shared->markFrameDisplayed(now); + if (position != kTimeUnknown) { + _wrapped.with([](Implementation &unwrapped) { + unwrapped.frameDisplayed(); + }); } - _wrapped.with([](Implementation &unwrapped) { - unwrapped.frameDisplayed(); - }); + return position; } QImage VideoTrack::frame(const FrameRequest &request) const { diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h index f96dde413..f5826ed2e 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_video_track.h @@ -34,7 +34,8 @@ public: // Called from the main thread. void start(); - void markFrameDisplayed(crl::time now); + // Returns the position of the displayed frame. + [[nodiscard]] crl::time markFrameDisplayed(crl::time now); [[nodiscard]] QImage frame(const FrameRequest &request) const; [[nodiscard]] rpl::producer renderNextFrame() const; @@ -47,7 +48,7 @@ private: struct Frame { QImage original; crl::time position = kTimeUnknown; - //crl::time presentation = kTimeUnknown; + crl::time displayPosition = kTimeUnknown; crl::time displayed = kTimeUnknown; FrameRequest request; @@ -62,7 +63,7 @@ private: PrepareFrame, PrepareNextCheck>; struct PresentFrame { - crl::time position = kTimeUnknown; + crl::time displayPosition = kTimeUnknown; crl::time nextCheckDelay = 0; }; @@ -74,7 +75,8 @@ private: [[nodiscard]] PresentFrame presentFrame(crl::time trackTime); // Called from the main thread. - [[nodiscard]] bool markFrameDisplayed(crl::time now); + // Returns the position of the displayed frame. + [[nodiscard]] crl::time markFrameDisplayed(crl::time now); [[nodiscard]] not_null frameForPaint(); private: diff --git a/Telegram/SourceFiles/rpl/event_stream.h b/Telegram/SourceFiles/rpl/event_stream.h index 12c58f4ac..52186490f 100644 --- a/Telegram/SourceFiles/rpl/event_stream.h +++ b/Telegram/SourceFiles/rpl/event_stream.h @@ -73,10 +73,10 @@ public: }); } auto events_starting_with(Value &&value) const { - return single(std::move(value)) | then(events()); + return single(std::move(value)) | then(events()); } auto events_starting_with_copy(const Value &value) const { - return single(value) | then(events()); + return single(value) | then(events()); } bool has_consumers() const { return (_data != nullptr) && !_data->consumers.empty(); diff --git a/Telegram/SourceFiles/rpl/variable.h b/Telegram/SourceFiles/rpl/variable.h index 55eb3a0a9..f318ecf7b 100644 --- a/Telegram/SourceFiles/rpl/variable.h +++ b/Telegram/SourceFiles/rpl/variable.h @@ -61,7 +61,7 @@ constexpr bool supports_equality_compare_v } // namespace details -template +template class variable final { public: variable() : _data{} { @@ -127,6 +127,30 @@ public: return _changes.events(); } + // Send 'done' to all subscribers and unsubscribe them. + template < + typename OtherType, + typename = std::enable_if_t< + std::is_assignable_v>> + void reset(OtherType &&data) { + _data = std::forward(data); + _changes = event_stream(); + } + void reset() { + reset(Type()); + } + + template < + typename OtherError, + typename = std::enable_if_t< + std::is_constructible_v>> + void reset_with_error(OtherError &&error) { + _changes.fire_error(std::forward(error)); + } + void reset_with_error() { + reset_with_error(Error()); + } + private: template variable &assign(OtherType &&data) { @@ -149,7 +173,7 @@ private: } Type _data; - event_stream _changes; + event_stream _changes; lifetime _lifetime; };