diff --git a/Telegram/SourceFiles/base/algorithm.h b/Telegram/SourceFiles/base/algorithm.h index aea8ecd1a..b06e24eb6 100644 --- a/Telegram/SourceFiles/base/algorithm.h +++ b/Telegram/SourceFiles/base/algorithm.h @@ -22,6 +22,16 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org namespace base { +template +inline Type take(Type &value) { + return std::exchange(value, Type {}); +} + +template +inline constexpr size_t array_size(const Type(&)[Size]) { + return Size; +} + // This version of remove_if allows predicate to push_back() items. // The added items won't be tested for predicate but just left in the container. template diff --git a/Telegram/SourceFiles/core/utils.h b/Telegram/SourceFiles/core/utils.h index 3c46326dd..2d9d5eb7b 100644 --- a/Telegram/SourceFiles/core/utils.h +++ b/Telegram/SourceFiles/core/utils.h @@ -22,6 +22,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "core/basic_types.h" #include "base/flags.h" +#include "base/algorithm.h" // Define specializations for QByteArray for Qt 5.3.2, because // QByteArray in Qt 5.3.2 doesn't declare "pointer" subtype. @@ -42,17 +43,6 @@ inline span make_span(const QByteArray &cont) { #endif // OS_MAC_OLD namespace base { - -template -inline constexpr size_t array_size(const T(&)[N]) { - return N; -} - -template -inline T take(T &source) { - return std::exchange(source, T()); -} - namespace internal { template diff --git a/Telegram/SourceFiles/rpl/consumer.h b/Telegram/SourceFiles/rpl/consumer.h index 75601cbfe..e07ab7717 100644 --- a/Telegram/SourceFiles/rpl/consumer.h +++ b/Telegram/SourceFiles/rpl/consumer.h @@ -25,11 +25,14 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org namespace rpl { -struct no_value { +template +struct no_type { + no_type() = delete; }; +using no_value = no_type<'V'>; +using no_error = no_type<'E'>; -struct no_error { - no_error() = delete; +struct empty_value { }; template @@ -47,11 +50,11 @@ public: OnError &&error, OnDone &&done); - bool putNext(Value value) const; - void putError(Error error) const; - void putDone() const; + bool put_next(Value value) const; + void put_error(Error error) const; + void put_done() const; - void setLifetime(lifetime &&lifetime) const; + void set_lifetime(lifetime &&lifetime) const; void terminate() const; bool operator==(const consumer &other) const { @@ -93,11 +96,11 @@ private: template class consumer::abstract_consumer_instance { public: - virtual bool putNext(Value value) = 0; - virtual void putError(Error error) = 0; - virtual void putDone() = 0; + virtual bool put_next(Value value) = 0; + virtual void put_error(Error error) = 0; + virtual void put_done() = 0; - void setLifetime(lifetime &&lifetime); + void set_lifetime(lifetime &&lifetime); void terminate(); protected: @@ -122,9 +125,9 @@ public: , _done(std::forward(done)) { } - bool putNext(Value value) override; - void putError(Error error) override; - void putDone() override; + bool put_next(Value value) override; + void put_error(Error error) override; + void put_done() override; private: OnNext _next; @@ -167,9 +170,9 @@ consumer::consumer( } template -bool consumer::putNext(Value value) const { +bool consumer::put_next(Value value) const { if (_instance) { - if (_instance->putNext(std::move(value))) { + if (_instance->put_next(std::move(value))) { return true; } _instance = nullptr; @@ -178,23 +181,23 @@ bool consumer::putNext(Value value) const { } template -void consumer::putError(Error error) const { +void consumer::put_error(Error error) const { if (_instance) { - std::exchange(_instance, nullptr)->putError(std::move(error)); + std::exchange(_instance, nullptr)->put_error(std::move(error)); } } template -void consumer::putDone() const { +void consumer::put_done() const { if (_instance) { - std::exchange(_instance, nullptr)->putDone(); + std::exchange(_instance, nullptr)->put_done(); } } template -void consumer::setLifetime(lifetime &&lifetime) const { +void consumer::set_lifetime(lifetime &&lifetime) const { if (_instance) { - _instance->setLifetime(std::move(lifetime)); + _instance->set_lifetime(std::move(lifetime)); } else { lifetime.destroy(); } @@ -208,7 +211,7 @@ void consumer::terminate() const { } template -void consumer::abstract_consumer_instance::setLifetime( +void consumer::abstract_consumer_instance::set_lifetime( lifetime &&lifetime) { std::unique_lock lock(_mutex); if (_terminated) { @@ -234,7 +237,7 @@ void consumer::abstract_consumer_instance::terminate() { template template -bool consumer::consumer_instance::putNext( +bool consumer::consumer_instance::put_next( Value value) { std::unique_lock lock(this->_mutex); if (this->_terminated) { @@ -249,7 +252,7 @@ bool consumer::consumer_instance::putNext template template -void consumer::consumer_instance::putError( +void consumer::consumer_instance::put_error( Error error) { std::unique_lock lock(this->_mutex); if (!this->_terminated) { @@ -263,7 +266,7 @@ void consumer::consumer_instance::putErro template template -void consumer::consumer_instance::putDone() { +void consumer::consumer_instance::put_done() { std::unique_lock lock(this->_mutex); if (!this->_terminated) { auto handler = std::move(this->_done); diff --git a/Telegram/SourceFiles/rpl/event_stream.h b/Telegram/SourceFiles/rpl/event_stream.h index ef4f1c419..ad9b6cdc6 100644 --- a/Telegram/SourceFiles/rpl/event_stream.h +++ b/Telegram/SourceFiles/rpl/event_stream.h @@ -22,6 +22,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "producer.h" #include "base/algorithm.h" +#include "base/assertion.h" namespace rpl { @@ -29,53 +30,53 @@ template class event_stream { public: event_stream(); + event_stream(event_stream &&other); void fire(Value value); - producer events(); + producer events() const; ~event_stream(); private: - std::weak_ptr weak() const { - return _strong; - } - void addConsumer(consumer &&consumer) { - _consumers.push_back(std::move(consumer)); - } - void removeConsumer(const consumer &consumer) { - auto it = base::find(_consumers, consumer); - if (it != _consumers.end()) { - it->terminate(); - } + std::weak_ptr>> weak() const { + return _consumers; } - std::shared_ptr _strong; - std::vector> _consumers; + std::shared_ptr>> _consumers; }; template event_stream::event_stream() - : _strong(std::make_shared(this)) { + : _consumers(std::make_shared>>()) { +} + +template +event_stream::event_stream(event_stream &&other) + : _consumers(base::take(other._consumers)) { } template void event_stream::fire(Value value) { - base::push_back_safe_remove_if(_consumers, [&](auto &consumer) { - return !consumer.putNext(value); + Expects(_consumers != nullptr); + base::push_back_safe_remove_if(*_consumers, [&](auto &consumer) { + return !consumer.put_next(value); }); } template -producer event_stream::events() { +producer event_stream::events() const { return producer([weak = weak()](consumer consumer) { if (auto strong = weak.lock()) { auto result = [weak, consumer] { if (auto strong = weak.lock()) { - (*strong)->removeConsumer(consumer); + auto it = base::find(*strong, consumer); + if (it != strong->end()) { + it->terminate(); + } } }; - (*strong)->addConsumer(std::move(consumer)); + strong->push_back(std::move(consumer)); return lifetime(std::move(result)); } return lifetime(); @@ -84,8 +85,10 @@ producer event_stream::events() { template event_stream::~event_stream() { - for (auto &consumer : _consumers) { - consumer.putDone(); + if (_consumers) { + for (auto &consumer : *_consumers) { + consumer.put_done(); + } } } diff --git a/Telegram/SourceFiles/rpl/lifetime.h b/Telegram/SourceFiles/rpl/lifetime.h index 59f2c39b5..96d73e953 100644 --- a/Telegram/SourceFiles/rpl/lifetime.h +++ b/Telegram/SourceFiles/rpl/lifetime.h @@ -22,7 +22,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "base/lambda.h" #include "base/algorithm.h" -#include +#include namespace rpl { @@ -33,45 +33,64 @@ public: lifetime &operator=(lifetime &&other); template ()())> - lifetime(Destroy &&destroy) : _destroy(std::forward(destroy)) { + lifetime(Destroy &&destroy); + + explicit operator bool() const { return !_callbacks.empty(); } + + template ()())> + void add(Destroy &&destroy); + void add(lifetime &&other); + void destroy(); + + template + Type *make_state(Args&& ...args) { + auto result = new Type(std::forward(args)...); + add([result]() mutable { + static_assert(sizeof(Type) > 0, "Can't delete unknown type."); + delete base::take(result); + }); + return result; } - void add(lifetime other) { - _nested.push_back(std::move(other)); - } - - void destroy() { - auto nested = std::exchange(_nested, std::vector()); - auto callback = std::exchange(_destroy, base::lambda_once()); - - if (!nested.empty()) { - nested.clear(); - } - if (callback) { - callback(); - } - } - - ~lifetime() { - destroy(); - } + ~lifetime() { destroy(); } private: - base::lambda_once _destroy; - std::vector _nested; + std::deque> _callbacks; }; -lifetime::lifetime(lifetime &&other) -: _destroy(std::exchange(other._destroy, base::lambda_once())) -, _nested(std::exchange(other._nested, std::vector())) { +inline lifetime::lifetime(lifetime &&other) +: _callbacks(base::take(other._callbacks)) { } -lifetime &lifetime::operator=(lifetime &&other) { - std::swap(_destroy, other._destroy); - std::swap(_nested, other._nested); +inline lifetime &lifetime::operator=(lifetime &&other) { + std::swap(_callbacks, other._callbacks); other.destroy(); return *this; } +template +inline lifetime::lifetime(Destroy &&destroy) { + _callbacks.emplace_back(std::forward(destroy)); +} + +template +inline void lifetime::add(Destroy &&destroy) { + _callbacks.push_front(destroy); +} + +inline void lifetime::add(lifetime &&other) { + auto callbacks = base::take(other._callbacks); + _callbacks.insert( + _callbacks.begin(), + std::make_move_iterator(callbacks.begin()), + std::make_move_iterator(callbacks.end())); +} + +inline void lifetime::destroy() { + for (auto &callback : base::take(_callbacks)) { + callback(); + } +} + } // namespace rpl diff --git a/Telegram/SourceFiles/rpl/producer.h b/Telegram/SourceFiles/rpl/producer.h index 22fdb5cae..67b3a4268 100644 --- a/Telegram/SourceFiles/rpl/producer.h +++ b/Telegram/SourceFiles/rpl/producer.h @@ -26,9 +26,12 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org namespace rpl { -template +template class producer { public: + using value_type = Value; + using error_type = Error; + template ()(std::declval>())), lifetime @@ -45,7 +48,7 @@ public: lifetime start( OnNext &&next, OnError &&error, - OnDone &&done); + OnDone &&done) const; private: base::lambda)> _generator; @@ -69,13 +72,519 @@ template < lifetime producer::start( OnNext &&next, OnError &&error, - OnDone &&done) { + OnDone &&done) const { auto result = consumer( std::forward(next), std::forward(error), std::forward(done)); - result.setLifetime(_generator(result)); + result.set_lifetime(_generator(result)); return [result] { result.terminate(); }; } +template +inline producer duplicate(const producer &producer) { + return producer; +} + +template < + typename Value, + typename Error, + typename Method, + typename = decltype(std::declval()(std::declval>()))> +inline decltype(auto) operator|(producer &&producer, Method &&method) { + return std::forward(method)(std::move(producer)); +} + +template +inline decltype(auto) bind_on_next(OnNext &&handler) { + return [handler = std::forward(handler)](auto &&existing) mutable { + using value_type = typename std::decay_t::value_type; + using error_type = typename std::decay_t::error_type; + return producer([ + existing = std::move(existing), + handler = std::forward(handler) + ](consumer consumer) { + return existing.start([handler = std::decay_t(handler)]( + value_type value) { + handler(value); + }, [consumer](error_type error) { + consumer.put_error(error); + }, [consumer] { + consumer.put_done(); + }); + }); + }; +} + +template +inline decltype(auto) bind_on_error(OnError &&handler) { + return [handler = std::forward(handler)](auto &&existing) mutable { + using value_type = typename std::decay_t::value_type; + using error_type = typename std::decay_t::error_type; + return producer([ + existing = std::move(existing), + handler = std::forward(handler) + ](consumer consumer) { + return existing.start([consumer](value_type value) { + consumer.put_next(value); + }, [handler = std::decay_t(handler)](error_type value) { + handler(value); + }, [consumer] { + consumer.put_done(); + }); + }); + }; +} + +template +inline decltype(auto) bind_on_done(OnDone &&handler) { + return [handler = std::forward(handler)](auto &&existing) mutable { + using value_type = typename std::decay_t::value_type; + using error_type = typename std::decay_t::error_type; + return producer([ + existing = std::move(existing), + handler = std::forward(handler) + ](consumer consumer) { + return existing.start([consumer](value_type value) { + consumer.put_next(value); + }, [consumer](error_type value) { + consumer.put_error(value); + }, [handler = std::decay_t(handler)] { + handler(); + }); + }); + }; +} + +namespace details { + +template +struct next_holder { + OnNext next; +}; + +template +struct error_holder { + OnError error; +}; + +template +struct done_holder { + OnDone done; +}; + +template < + typename Value, + typename Error, + typename OnNext> +struct producer_with_next { + producer producer; + OnNext next; +}; + +template < + typename Value, + typename Error, + typename OnError> +struct producer_with_error { + producer producer; + OnError error; +}; + +template < + typename Value, + typename Error, + typename OnDone> +struct producer_with_done { + producer producer; + OnDone done; +}; + +template < + typename Value, + typename Error, + typename OnNext, + typename OnError> +struct producer_with_next_error { + producer producer; + OnNext next; + OnError error; +}; + +template < + typename Value, + typename Error, + typename OnNext, + typename OnDone> +struct producer_with_next_done { + producer producer; + OnNext next; + OnDone done; +}; + +template < + typename Value, + typename Error, + typename OnError, + typename OnDone> +struct producer_with_error_done { + producer producer; + OnError error; + OnDone done; +}; + +template < + typename Value, + typename Error, + typename OnNext, + typename OnError, + typename OnDone> +struct producer_with_next_error_done { + producer producer; + OnNext next; + OnError error; + OnDone done; +}; + +struct lifetime_holder { + lifetime &alive_while; +}; + +} // namespace details + +template +inline details::next_holder> on_next(OnNext &&handler) { + return { std::forward(handler) }; +} + +template +inline details::error_holder> on_error(OnError &&handler) { + return { std::forward(handler) }; +} + +template +inline details::done_holder> on_done(OnDone &&handler) { + return { std::forward(handler) }; +} + +inline details::lifetime_holder start(lifetime &alive_while) { + return { alive_while }; +} + +namespace details { + +template < + typename Value, + typename Error, + typename OnNext, + typename = decltype(std::declval()(std::declval()))> +inline producer_with_next operator|( + producer &&producer, + next_holder &&handler) { + return { std::move(producer), std::move(handler.next) }; +} + +template < + typename Value, + typename Error, + typename OnError, + typename = decltype(std::declval()(std::declval()))> +inline producer_with_error operator|( + producer &&producer, + error_holder &&handler) { + return { std::move(producer), std::move(handler.error) }; +} + +template < + typename Value, + typename Error, + typename OnDone, + typename = decltype(std::declval()())> +inline producer_with_done operator|( + producer &&producer, + done_holder &&handler) { + return { std::move(producer), std::move(handler.done) }; +} + +template < + typename Value, + typename Error, + typename OnNext, + typename OnError, + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()(std::declval()))> +inline producer_with_next_error operator|( + producer_with_next &&producer_with_next, + error_holder &&handler) { + return { + std::move(producer_with_next.producer), + std::move(producer_with_next.next), + std::move(handler.error) }; +} + +template < + typename Value, + typename Error, + typename OnNext, + typename OnError, + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()(std::declval()))> +inline producer_with_next_error operator|( + producer_with_error &&producer_with_error, + next_holder &&handler) { + return { + std::move(producer_with_error.producer), + std::move(handler.next), + std::move(producer_with_error.error) }; +} + +template < + typename Value, + typename Error, + typename OnNext, + typename OnDone, + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()())> +inline producer_with_next_done operator|( + producer_with_next &&producer_with_next, + done_holder &&handler) { + return { + std::move(producer_with_next.producer), + std::move(producer_with_next.next), + std::move(handler.done) }; +} + +template < + typename Value, + typename Error, + typename OnNext, + typename OnDone, + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()())> +inline producer_with_next_done operator|( + producer_with_done &&producer_with_done, + next_holder &&handler) { + return { + std::move(producer_with_done.producer), + std::move(handler.next), + std::move(producer_with_done.done) }; +} + +template < + typename Value, + typename Error, + typename OnError, + typename OnDone, + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()())> +inline producer_with_error_done operator|( + producer_with_error &&producer_with_error, + done_holder &&handler) { + return { + std::move(producer_with_error.producer), + std::move(producer_with_error.error), + std::move(handler.done) }; +} + +template < + typename Value, + typename Error, + typename OnError, + typename OnDone, + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()())> +inline producer_with_error_done operator|( + producer_with_done &&producer_with_done, + error_holder &&handler) { + return { + std::move(producer_with_done.producer), + std::move(handler.error), + std::move(producer_with_done.done) }; +} + +template < + typename Value, + typename Error, + typename OnNext, + typename OnError, + typename OnDone, + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()())> +inline producer_with_next_error_done< + Value, + Error, + OnNext, + OnError, + OnDone> operator|( + producer_with_next_error< + Value, + Error, + OnNext, + OnError> &&producer_with_next_error, + done_holder &&handler) { + return { + std::move(producer_with_next_error.producer), + std::move(producer_with_next_error.next), + std::move(producer_with_next_error.error), + std::move(handler.done) }; +} + +template < + typename Value, + typename Error, + typename OnNext, + typename OnError, + typename OnDone, + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()())> +inline producer_with_next_error_done< + Value, + Error, + OnNext, + OnError, + OnDone> operator|( + producer_with_next_done< + Value, + Error, + OnNext, + OnDone> &&producer_with_next_done, + error_holder &&handler) { + return { + std::move(producer_with_next_done.producer), + std::move(producer_with_next_done.next), + std::move(handler.error), + std::move(producer_with_next_done.done) }; +} + +template < + typename Value, + typename Error, + typename OnNext, + typename OnError, + typename OnDone, + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()(std::declval())), + typename = decltype(std::declval()())> +inline producer_with_next_error_done< + Value, + Error, + OnNext, + OnError, + OnDone> operator|( + producer_with_error_done< + Value, + Error, + OnError, + OnDone> &&producer_with_error_done, + next_holder &&handler) { + return { + std::move(producer_with_error_done.producer), + std::move(handler.next), + std::move(producer_with_error_done.error), + std::move(producer_with_error_done.done) }; +} + +template < + typename Value, + typename Error, + typename OnNext, + typename OnError, + typename OnDone> +inline void operator|( + producer_with_next_error_done< + Value, + Error, + OnNext, + OnError, + OnDone> &&producer_with_next_error_done, + lifetime_holder &&lifetime) { + lifetime.alive_while.add(producer_with_next_error_done.producer.start( + std::move(producer_with_next_error_done.next), + std::move(producer_with_next_error_done.error), + std::move(producer_with_next_error_done.done))); +} + +template +inline void operator|( + producer &&producer, + lifetime_holder &&start_with_lifetime) { + return std::move(producer) + | on_next([](Value) {}) + | on_error([](Error) {}) + | on_done([] {}) + | std::move(start_with_lifetime); +} + +template +inline void operator|( + producer_with_next &&producer_with_next, + lifetime_holder &&start_with_lifetime) { + return std::move(producer_with_next) + | on_error([](Error) {}) + | on_done([] {}) + | std::move(start_with_lifetime); +} + +template +inline void operator|( + producer_with_error &&producer_with_error, + lifetime_holder &&start_with_lifetime) { + return std::move(producer_with_error) + | on_next([](Value) {}) + | on_done([] {}) + | std::move(start_with_lifetime); +} + +template +inline void operator|( + producer_with_done &&producer_with_done, + lifetime_holder &&start_with_lifetime) { + return std::move(producer_with_done) + | on_next([](Value) {}) + | on_error([](Error) {}) + | std::move(start_with_lifetime); +} + +template +inline void operator|( + producer_with_next_error< + Value, + Error, + OnNext, + OnError> &&producer_with_next_error, + lifetime_holder &&start_with_lifetime) { + return std::move(producer_with_next_error) + | on_done([] {}) + | std::move(start_with_lifetime); +} + +template +inline void operator|( + producer_with_next_done< + Value, + Error, + OnNext, + OnDone> &&producer_with_next_done, + lifetime_holder &&start_with_lifetime) { + return std::move(producer_with_next_done) + | on_error([](Error) {}) + | std::move(start_with_lifetime); +} + +template +inline void operator|( + producer_with_error_done< + Value, + Error, + OnError, + OnDone> &&producer_with_error_done, + lifetime_holder &&start_with_lifetime) { + return std::move(producer_with_error_done) + | on_next([](Value) {}) + | std::move(start_with_lifetime); +} + +} // namespace details } // namespace rpl diff --git a/Telegram/SourceFiles/rpl/producer_tests.cpp b/Telegram/SourceFiles/rpl/producer_tests.cpp index c1d7f3805..dd61d1d6e 100644 --- a/Telegram/SourceFiles/rpl/producer_tests.cpp +++ b/Telegram/SourceFiles/rpl/producer_tests.cpp @@ -27,7 +27,8 @@ using namespace rpl; class OnDestructor { public: - OnDestructor(base::lambda_once callback) : _callback(std::move(callback)) { + OnDestructor(base::lambda_once callback) + : _callback(std::move(callback)) { } ~OnDestructor() { if (_callback) { @@ -52,22 +53,22 @@ TEST_CASE("basic producer tests", "[rpl::producer]") { }); { producer([=](auto consumer) { - destroyCaller; - consumer.putNext(1); - consumer.putNext(2); - consumer.putNext(3); - consumer.putDone(); + (void)destroyCaller; + consumer.put_next(1); + consumer.put_next(2); + consumer.put_next(3); + consumer.put_done(); return [=] { - destroyCaller; + (void)destroyCaller; *lifetimeEnded = true; }; }).start([=](int value) { - destroyCaller; + (void)destroyCaller; *sum += value; }, [=](no_error) { - destroyCaller; + (void)destroyCaller; }, [=]() { - destroyCaller; + (void)destroyCaller; *doneGenerated = true; }); } @@ -82,7 +83,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") { auto errorGenerated = std::make_shared(false); { producer([=](auto consumer) { - consumer.putError(true); + consumer.put_error(true); return lifetime(); }).start([=](no_value) { }, [=](bool error) { @@ -124,9 +125,9 @@ TEST_CASE("basic producer tests", "[rpl::producer]") { { saved = producer([=](auto consumer) { auto inner = producer([=](auto consumer) { - consumer.putNext(1); - consumer.putNext(2); - consumer.putNext(3); + consumer.put_next(1); + consumer.put_next(2); + consumer.put_next(3); return [=] { ++*lifetimeEndCount; }; @@ -135,12 +136,12 @@ TEST_CASE("basic producer tests", "[rpl::producer]") { ++*lifetimeEndCount; }); result.add(inner.start([=](int value) { - consumer.putNext(value); + consumer.put_next(value); }, [=](no_error) { }, [=] { })); result.add(inner.start([=](int value) { - consumer.putNext(value); + consumer.put_next(value); }, [=](no_error) { }, [=] { })); @@ -159,7 +160,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") { SECTION("event_stream basic test") { auto sum = std::make_shared(0); - rpl::event_stream stream; + event_stream stream; stream.fire(1); stream.fire(2); stream.fire(3); @@ -182,7 +183,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") { SECTION("event_stream add in handler test") { auto sum = std::make_shared(0); - rpl::event_stream stream; + event_stream stream; { auto composite = lifetime(); @@ -233,7 +234,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") { SECTION("event_stream add and remove in handler test") { auto sum = std::make_shared(0); - rpl::event_stream stream; + event_stream stream; { auto composite = lifetime(); @@ -286,7 +287,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") { auto sum = std::make_shared(0); lifetime extended; { - rpl::event_stream stream; + event_stream stream; extended = stream.events().start([=](int value) { *sum += value; }, [=](no_error) { @@ -298,5 +299,150 @@ TEST_CASE("basic producer tests", "[rpl::producer]") { } REQUIRE(*sum == 1 + 2 + 3); } + + SECTION("event_stream move test") { + auto sum = std::make_shared(0); + lifetime extended; + { + event_stream stream; + stream.events() | on_next([=](int value) { + *sum += value; + }) | start(extended); + + stream.fire(1); + stream.fire(2); + + auto movedStream = std::move(stream); + movedStream.fire(3); + movedStream.fire(4); + } + REQUIRE(*sum == 1 + 2 + 3 + 4); + } } +TEST_CASE("basic piping tests", "[rpl::producer]") { + SECTION("bind_on_next, bind_on_error, bind_on_done") { + auto sum = std::make_shared(0); + auto doneGenerated = std::make_shared(false); + { + auto alive = lifetime(); + producer([=](auto consumer) { + consumer.put_next(1); + consumer.put_next(2); + consumer.put_next(3); + consumer.put_done(); + return lifetime(); + }) | bind_on_next([=](int value) { + *sum += value; + }) | bind_on_done([=]() { + *doneGenerated = true; + }) | start(alive); + + producer([=](auto consumer) { + consumer.put_error(4); + return lifetime(); + }) | bind_on_error([=](int value) { + *sum += value; + }) | bind_on_done([=]() { + *doneGenerated = false; + }) | start(alive); + } + REQUIRE(*sum == 1 + 2 + 3 + 4); + REQUIRE(*doneGenerated); + } + + SECTION("on_next, on_error, on_done") { + auto sum = std::make_shared(0); + auto dones = std::make_shared(0); + { + auto alive = lifetime(); + producer([=](auto consumer) { + consumer.put_next(1); + consumer.put_done(); + return lifetime(); + }) | on_next([=](int value) { + *sum += value; + }) | start(alive); + + producer([=](auto consumer) { + consumer.put_next(11); + consumer.put_error(111); + return lifetime(); + }) | on_error([=](int value) { + *sum += value; + }) | start(alive); + + producer([=](auto consumer) { + consumer.put_next(1111); + consumer.put_done(); + return lifetime(); + }) | on_done([=]() { + *dones += 1; + }) | start(alive); + + producer([=](auto consumer) { + consumer.put_next(11111); + consumer.put_next(11112); + consumer.put_next(11113); + consumer.put_error(11114); + return lifetime(); + }) | on_next([=](int value) { + *sum += value; + }) | on_error([=](int value) { + *sum += value; + }) | start(alive); + } + + auto alive = lifetime(); + producer([=](auto consumer) { + consumer.put_next(111111); + consumer.put_next(111112); + consumer.put_next(111113); + consumer.put_done(); + return lifetime(); + }) | on_next([=](int value) { + *sum += value; + }) | on_done([=]() { + *dones += 11; + }) | start(alive); + + producer([=](auto consumer) { + consumer.put_error(1111111); + return lifetime(); + }) | on_error([=](int value) { + *sum += value; + }) | on_done([=]() { + *dones = 0; + }) | start(alive); + + REQUIRE(*sum == + 1 + + 111 + + 11111 + 11112 + 11113 + 11114 + + 111111 + 111112 + 111113 + + 1111111); + REQUIRE(*dones == 1 + 11); + } + + SECTION("on_next should copy its callback") { + auto sum = std::make_shared(0); + { + auto next = [=](int value) { + REQUIRE(sum != nullptr); + *sum += value; + }; + + for (int i = 0; i != 3; ++i) { + auto alive = lifetime(); + producer([=](auto consumer) { + consumer.put_next(1); + consumer.put_done(); + return lifetime(); + }) + | on_next(next) + | start(alive); + } + } + REQUIRE(*sum == 3); + } +}