From 771a51224e11a162af58b60c0b0cedb730484d15 Mon Sep 17 00:00:00 2001 From: John Preston Date: Sat, 16 Feb 2019 16:47:52 +0400 Subject: [PATCH] Support errors in rpl::event_stream. --- Telegram/SourceFiles/rpl/event_stream.h | 225 ++++++++++++++++-------- Telegram/SourceFiles/rpl/range.h | 7 +- 2 files changed, 156 insertions(+), 76 deletions(-) diff --git a/Telegram/SourceFiles/rpl/event_stream.h b/Telegram/SourceFiles/rpl/event_stream.h index 94177d4c1..cd5717731 100644 --- a/Telegram/SourceFiles/rpl/event_stream.h +++ b/Telegram/SourceFiles/rpl/event_stream.h @@ -12,6 +12,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include #include #include +#include #include "base/assertion.h" #include "base/index_based_iterator.h" @@ -19,7 +20,7 @@ namespace rpl { // Currently not thread-safe :( -template +template class event_stream { public: event_stream(); @@ -34,17 +35,29 @@ public: void fire_copy(const Value &value) const { return fire_forward(value); } -#if defined _MSC_VER && _MSC_VER >= 1914 && false - producer events() const { -#else // _MSC_VER >= 1914 + + template + void fire_error_forward(OtherError &&error) const; + void fire_error(Error &&error) const { + return fire_error_forward(std::move(error)); + } + void fire_error_copy(const Error &error) const { + return fire_error_forward(error); + } + + void fire_done() const; + +#if defined _MSC_VER && _MSC_VER >= 1914 && _MSC_VER < 1916 + producer events() const { +#else // _MSC_VER >= 1914 && _MSC_VER < 1916 auto events() const { -#endif // _MSC_VER >= 1914 - return make_producer([weak = make_weak()]( +#endif // _MSC_VER >= 1914 && _MSC_VER < 1916 + return make_producer([weak = make_weak()]( const auto &consumer) { - if (auto strong = weak.lock()) { + if (const auto strong = weak.lock()) { auto result = [weak, consumer] { - if (auto strong = weak.lock()) { - auto it = std::find( + if (const auto strong = weak.lock()) { + const auto it = std::find( strong->consumers.begin(), strong->consumers.end(), consumer); @@ -73,7 +86,7 @@ public: private: struct Data { - std::vector> consumers; + std::vector> consumers; int depth = 0; }; std::weak_ptr make_weak() const; @@ -82,72 +95,113 @@ private: }; -template -inline event_stream::event_stream() { +template +inline event_stream::event_stream() { } -template -inline event_stream::event_stream(event_stream &&other) +template +inline event_stream::event_stream(event_stream &&other) : _data(details::take(other._data)) { } -template -inline event_stream &event_stream::operator=( +template +inline event_stream &event_stream::operator=( event_stream &&other) { - _data = details::take(other._data); + if (this != &other) { + fire_done(); + _data = details::take(other._data); + } return *this; } -template +template template -inline void event_stream::fire_forward( +inline void event_stream::fire_forward( OtherValue &&value) const { - auto copy = _data; - if (!copy) { + if (!_data) { + return; + } + const auto copy = _data; + auto &consumers = copy->consumers; + if (consumers.empty()) { return; } ++copy->depth; - auto &consumers = copy->consumers; - auto begin = base::index_based_begin(consumers); - auto end = base::index_based_end(consumers); - if (begin != end) { - // Copy value for every consumer except the last. - auto prev = end - 1; - auto removeFrom = std::remove_if(begin, prev, [&](auto &consumer) { - return !consumer.put_next_copy(value); - }); + const auto begin = base::index_based_begin(consumers); + const auto end = base::index_based_end(consumers); - // Perhaps move value for the last consumer. - if (prev->put_next_forward(std::forward(value))) { - if (removeFrom != prev) { - *removeFrom++ = std::move(*prev); - } else { - ++removeFrom; + // Copy value for every consumer except the last. + const auto prev = end - 1; + auto staleFrom = std::remove_if(begin, prev, [&](const auto &consumer) { + return !consumer.put_next_copy(value); + }); + + // Perhaps move value for the last consumer. + if (prev->put_next_forward(std::forward(value))) { + if (staleFrom != prev) { + *staleFrom++ = std::move(*prev); + } else { + ++staleFrom; + } + } + + if (staleFrom != end) { + // Move new consumers. + const auto newEnd = base::index_based_end(consumers); + if (newEnd != end) { + Assert(newEnd > end); + for (auto i = end; i != newEnd;) { + *staleFrom++ = *i++; } } - if (removeFrom != end) { - // Move new consumers. - auto newEnd = base::index_based_end(consumers); - if (newEnd != end) { - Assert(newEnd > end); - while (end != newEnd) { - *removeFrom++ = *end++; - } - } - - // Erase stale consumers. - if (copy->depth == 1) { - consumers.erase(removeFrom.base(), consumers.end()); - } + // Erase stale consumers. + if (copy->depth == 1) { + consumers.erase(staleFrom.base(), consumers.end()); } } --copy->depth; } -template -inline auto event_stream::make_weak() const +template +template +inline void event_stream::fire_error_forward( + OtherError &&error) const { + if (!_data) { + return; + } + const auto data = std::move(_data); + const auto &consumers = data->consumers; + if (consumers.empty()) { + return; + } + const auto begin = base::index_based_begin(consumers); + const auto end = base::index_based_end(consumers); + + // Copy error for every consumer except the last. + const auto prev = end - 1; + std::for_each(begin, prev, [&](const auto &consumer) { + consumer.put_error_copy(error); + }); + + // Perhaps move error for the last consumer. + prev->put_error_forward(std::forward(error)); + + // Just drop any new consumers. +} + +template +void event_stream::fire_done() const { + if (const auto data = details::take(_data)) { + for (const auto &consumer : data->consumers) { + consumer.put_done(); + } + } +} + +template +inline auto event_stream::make_weak() const -> std::weak_ptr { if (!_data) { _data = std::make_shared(); @@ -155,23 +209,30 @@ inline auto event_stream::make_weak() const return _data; } - -template -inline event_stream::~event_stream() { - if (auto data = details::take(_data)) { - for (auto &consumer : data->consumers) { - consumer.put_done(); - } - } +template +inline event_stream::~event_stream() { + fire_done(); } -template +template inline auto start_to_stream( - event_stream &stream, + event_stream &stream, lifetime &alive_while) { - return start_with_next([&stream](auto &&value) { - stream.fire_forward(std::forward(value)); - }, alive_while); + if constexpr (std::is_same_v) { + return start_with_next_done([&](auto &&value) { + stream.fire_forward(std::forward(value)); + }, [&] { + stream.fire_done(); + }, alive_while); + } else { + return start_with_next_error_done([&](auto &&value) { + stream.fire_forward(std::forward(value)); + }, [&](auto &&error) { + stream.fire_error_forward(std::forward(error)); + }, [&] { + stream.fire_done(); + }, alive_while); + } } namespace details { @@ -184,19 +245,37 @@ public: template auto operator()(producer &&initial) { - auto stream = _lifetime.make_state>(); - auto collected = std::vector(); - { + auto stream = _lifetime.make_state>(); + auto values = std::vector(); + if constexpr (std::is_same_v) { auto collecting = stream->events().start( - [&collected](Value &&value) { - collected.push_back(std::move(value)); - }, + [&](Value &&value) { values.push_back(std::move(value)); }, [](const Error &error) {}, [] {}); std::move(initial) | start_to_stream(*stream, _lifetime); + collecting.destroy(); + + return vector(std::move(values)) | then(stream->events()); + } else { + auto maybeError = std::optional(); + auto collecting = stream->events().start( + [&](Value && value) { values.push_back(std::move(value)); }, + [](Error &&error) { maybeError = std::move(error); }, + [] {}); + std::move(initial) | start_to_stream(*stream, _lifetime); + collecting.destroy(); + + if (maybeError.has_value()) { + return rpl::producer([ + error = std::move(*maybeError) + ](const auto &consumer) mutable { + consumer.put_error(std::move(error)); + }); + } + return rpl::producer(vector( + std::move(values) + ) | then(stream->events())); } - return vector(std::move(collected)) - | then(stream->events()); } private: diff --git a/Telegram/SourceFiles/rpl/range.h b/Telegram/SourceFiles/rpl/range.h index 4cbd5d4be..d37b5afca 100644 --- a/Telegram/SourceFiles/rpl/range.h +++ b/Telegram/SourceFiles/rpl/range.h @@ -31,9 +31,9 @@ inline auto single() { }); } -template +template inline auto vector(std::vector &&values) { - return make_producer([ + return make_producer([ values = std::move(values) ](const auto &consumer) mutable { for (auto &value : values) { @@ -44,8 +44,9 @@ inline auto vector(std::vector &&values) { }); } +template inline auto vector(std::vector &&values) { - return make_producer([ + return make_producer([ values = std::move(values) ](const auto &consumer) { for (auto value : values) {