diff --git a/Telegram/SourceFiles/rpl/consumer.h b/Telegram/SourceFiles/rpl/consumer.h index 8adfc500c..eca674c01 100644 --- a/Telegram/SourceFiles/rpl/consumer.h +++ b/Telegram/SourceFiles/rpl/consumer.h @@ -329,6 +329,11 @@ public: Type *make_state(Args&& ...args) const; void terminate() const; + auto terminator() const { + return [self = *this] { + self.terminate(); + }; + } const details::type_erased_handlers *comparable() const { return _handlers.get(); diff --git a/Telegram/SourceFiles/rpl/flatten_latest.h b/Telegram/SourceFiles/rpl/flatten_latest.h index bce4a4e21..85d0d4079 100644 --- a/Telegram/SourceFiles/rpl/flatten_latest.h +++ b/Telegram/SourceFiles/rpl/flatten_latest.h @@ -44,7 +44,7 @@ public: [consumer, state](producer &&inner) { state->finished = false; state->alive = lifetime(); - auto started = std::move(inner).start( + std::move(inner).start( [consumer](auto &&value) { consumer.put_next_forward(std::forward(value)); }, [consumer](auto &&error) { @@ -55,10 +55,7 @@ public: } else { state->finished = true; } - }); - if (started) { - state->alive = std::move(started); - } + }, state->alive); }, [consumer](auto &&error) { consumer.put_error_forward(std::forward(error)); }, [consumer, state] { diff --git a/Telegram/SourceFiles/rpl/operators_tests.cpp b/Telegram/SourceFiles/rpl/operators_tests.cpp index b46b61583..a9f70e619 100644 --- a/Telegram/SourceFiles/rpl/operators_tests.cpp +++ b/Telegram/SourceFiles/rpl/operators_tests.cpp @@ -406,6 +406,24 @@ TEST_CASE("basic operators tests", "[rpl::operators]") { *sum += "done"; }, lifetime); } - REQUIRE(*sum == "012done"); + { + rpl::lifetime lifetime; + rpl::ints(3) | take(3) + | start_with_next_done([=](int value) { + *sum += std::to_string(value); + }, [=] { + *sum += "done"; + }, lifetime); + } + { + rpl::lifetime lifetime; + rpl::ints(3) | take(10) + | start_with_next_done([=](int value) { + *sum += std::to_string(value); + }, [=] { + *sum += "done"; + }, lifetime); + } + REQUIRE(*sum == "012done012done012done"); } } diff --git a/Telegram/SourceFiles/rpl/producer.h b/Telegram/SourceFiles/rpl/producer.h index 538dd34be..1003b5cd1 100644 --- a/Telegram/SourceFiles/rpl/producer.h +++ b/Telegram/SourceFiles/rpl/producer.h @@ -346,7 +346,7 @@ template inline void producer_base::start_existing( const consumer_type &consumer, lifetime &alive_while) && { - alive_while.add([consumer] { consumer.terminate(); }); + alive_while.add(consumer.terminator()); consumer.add_lifetime(std::move(_generator)(consumer)); } diff --git a/Telegram/SourceFiles/rpl/take.h b/Telegram/SourceFiles/rpl/take.h index 1556b31a9..c8025f03e 100644 --- a/Telegram/SourceFiles/rpl/take.h +++ b/Telegram/SourceFiles/rpl/take.h @@ -38,7 +38,7 @@ public: limit = _count ](const auto &consumer) mutable { auto count = consumer.template make_state(limit); - return std::move(initial).start( + auto initial_consumer = make_consumer( [consumer, count](auto &&value) { auto left = (*count)--; if (left) { @@ -55,6 +55,8 @@ public: }, [consumer] { consumer.put_done(); }); + consumer.add_lifetime(initial_consumer.terminator()); + return std::move(initial).start_existing(initial_consumer); }); }