From 7b69282c7e148c02687dffda838c55f6a695fba7 Mon Sep 17 00:00:00 2001 From: John Preston Date: Fri, 20 Oct 2017 19:18:26 +0300 Subject: [PATCH] Add rpl::merge(). --- Telegram/SourceFiles/rpl/combine.h | 24 ++--- Telegram/SourceFiles/rpl/merge.h | 163 +++++++++++++++++++++++++++++ Telegram/SourceFiles/rpl/rpl.h | 1 + Telegram/gyp/tests/tests.gyp | 1 + 4 files changed, 176 insertions(+), 13 deletions(-) create mode 100644 Telegram/SourceFiles/rpl/merge.h diff --git a/Telegram/SourceFiles/rpl/combine.h b/Telegram/SourceFiles/rpl/combine.h index 410e0f7d8..6767c044f 100644 --- a/Telegram/SourceFiles/rpl/combine.h +++ b/Telegram/SourceFiles/rpl/combine.h @@ -82,8 +82,9 @@ public: } } } - }, [consumer = _consumer](Error &&error) { - consumer.put_error(std::move(error)); + }, [consumer = _consumer](auto &&error) { + consumer.put_error_forward( + std::forward(error)); }, [consumer = _consumer, state = _state] { if (!--state->working) { consumer.put_done(); @@ -109,11 +110,7 @@ inline void combine_subscribe( std::index_sequence, std::tuple...> &&saved) { auto consume = { ( - details::combine_subscribe_one< - I, - consumer_type, - Values... - >( + combine_subscribe_one( consumer, state ).subscribe(std::get(std::move(saved))), 0)... }; @@ -137,7 +134,7 @@ template < class combine_implementation_helper...> { public: using CombinedValue = std::tuple; - using CombinedError = details::normalized_variant_t; + using CombinedError = normalized_variant_t; combine_implementation_helper( producer &&...producers) @@ -147,9 +144,9 @@ public: template lifetime operator()(const consumer &consumer) { auto state = consumer.template make_state< - details::combine_state>(); + combine_state>(); constexpr auto kArity = sizeof...(Values); - details::combine_subscribe( + combine_subscribe( consumer, state, std::make_index_sequence(), @@ -170,7 +167,7 @@ template < inline auto combine_implementation( producer &&...producers) { using CombinedValue = std::tuple; - using CombinedError = details::normalized_variant_t; + using CombinedError = normalized_variant_t; return make_producer( make_combine_implementation_helper(std::move(producers)...)); @@ -333,8 +330,9 @@ inline auto combine( consumer.put_next_copy(state->latest); } } - }, [consumer](Error &&error) { - consumer.put_error(std::move(error)); + }, [consumer](auto &&error) { + consumer.put_error_forward( + std::forward(error)); }, [consumer, state] { if (!--state->working) { consumer.put_done(); diff --git a/Telegram/SourceFiles/rpl/merge.h b/Telegram/SourceFiles/rpl/merge.h new file mode 100644 index 000000000..d254f7419 --- /dev/null +++ b/Telegram/SourceFiles/rpl/merge.h @@ -0,0 +1,163 @@ +/* +This file is part of Telegram Desktop, +the official desktop version of Telegram messaging app, see https://telegram.org + +Telegram Desktop is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +It is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +In addition, as a special exception, the copyright holders give permission +to link the code of portions of this program with the OpenSSL library. + +Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE +Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org +*/ +#pragma once + +#include +#include + +namespace rpl { +namespace details { + +struct merge_state { + merge_state(int working) : working(working) { + } + int working = 0; +}; + +template +class merge_subscribe_one { +public: + merge_subscribe_one( + const consumer_type &consumer, + merge_state *state) + : _consumer(consumer) + , _state(state) { + } + + template + void subscribe(producer &&producer) { + _consumer.add_lifetime(std::move(producer).start( + [consumer = _consumer](auto &&value) { + consumer.put_next_forward( + std::forward(value)); + }, [consumer = _consumer](auto &&error) { + consumer.put_error_forward( + std::forward(error)); + }, [consumer = _consumer, state = _state] { + if (!--state->working) { + consumer.put_done(); + } + })); + } + +private: + const consumer_type &_consumer; + merge_state *_state = nullptr; + +}; + +template < + typename consumer_type, + typename Value, + typename Error, + typename ...Generators, + std::size_t ...I> +inline void merge_subscribe( + const consumer_type &consumer, + merge_state *state, + std::index_sequence, + std::tuple...> &&saved) { + auto consume = { ( + details::merge_subscribe_one( + consumer, + state + ).subscribe(std::get(std::move(saved))), 0)... }; + (void)consume; +} + +template +class merge_implementation_helper; + +template +merge_implementation_helper...> +make_merge_implementation_helper(Producers &&...producers) { + return merge_implementation_helper...>( + std::forward(producers)...); +} + +template < + typename Value, + typename Error, + typename ...Generators> +class merge_implementation_helper...> { +public: + merge_implementation_helper( + producer &&...producers) + : _saved(std::make_tuple(std::move(producers)...)) { + } + + template + lifetime operator()(const consumer &consumer) { + auto state = consumer.template make_state< + details::merge_state>(sizeof...(Generators)); + constexpr auto kArity = sizeof...(Generators); + details::merge_subscribe( + consumer, + state, + std::make_index_sequence(), + std::move(_saved)); + + return lifetime(); + } + +private: + std::tuple...> _saved; + +}; + +template < + typename Value, + typename Error, + typename ...Generators> +inline auto merge_implementation( + producer &&...producers) { + return make_producer( + make_merge_implementation_helper(std::move(producers)...)); +} + +template +struct merge_producers : std::false_type { +}; + +template +constexpr bool merge_producers_v + = merge_producers::value; + +template < + typename Value, + typename Error, + typename ...Generators> +struct merge_producers< + producer...> + : std::true_type { +}; + +} // namespace details + +template < + typename ...Args, + typename = std::enable_if_t< + details::merge_producers_v>> +inline decltype(auto) merge(Args &&...args) { + return details::merge_implementation(std::move(args)...); +} + +} // namespace rpl diff --git a/Telegram/SourceFiles/rpl/rpl.h b/Telegram/SourceFiles/rpl/rpl.h index 6b66b8027..1805607f6 100644 --- a/Telegram/SourceFiles/rpl/rpl.h +++ b/Telegram/SourceFiles/rpl/rpl.h @@ -35,6 +35,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include #include #include +#include #include #include #include diff --git a/Telegram/gyp/tests/tests.gyp b/Telegram/gyp/tests/tests.gyp index dd1c59379..7fc33b7f2 100644 --- a/Telegram/gyp/tests/tests.gyp +++ b/Telegram/gyp/tests/tests.gyp @@ -115,6 +115,7 @@ '<(src_loc)/rpl/lifetime.h', '<(src_loc)/rpl/map.h', '<(src_loc)/rpl/mappers.h', + '<(src_loc)/rpl/merge.h', '<(src_loc)/rpl/never.h', '<(src_loc)/rpl/operators_tests.cpp', '<(src_loc)/rpl/producer.h',