Add piping and on_next, on_error, on_done, start.

This commit is contained in:
John Preston 2017-09-04 14:24:35 +03:00
parent 101fdb1fba
commit e70052e966
7 changed files with 792 additions and 112 deletions

View File

@ -22,6 +22,16 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
namespace base { namespace base {
template <typename Type>
inline Type take(Type &value) {
return std::exchange(value, Type {});
}
template <typename Type, size_t Size>
inline constexpr size_t array_size(const Type(&)[Size]) {
return Size;
}
// This version of remove_if allows predicate to push_back() items. // This version of remove_if allows predicate to push_back() items.
// The added items won't be tested for predicate but just left in the container. // The added items won't be tested for predicate but just left in the container.
template <typename Container, typename Predicate> template <typename Container, typename Predicate>

View File

@ -22,6 +22,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include "core/basic_types.h" #include "core/basic_types.h"
#include "base/flags.h" #include "base/flags.h"
#include "base/algorithm.h"
// Define specializations for QByteArray for Qt 5.3.2, because // Define specializations for QByteArray for Qt 5.3.2, because
// QByteArray in Qt 5.3.2 doesn't declare "pointer" subtype. // QByteArray in Qt 5.3.2 doesn't declare "pointer" subtype.
@ -42,17 +43,6 @@ inline span<const char> make_span(const QByteArray &cont) {
#endif // OS_MAC_OLD #endif // OS_MAC_OLD
namespace base { namespace base {
template <typename T, size_t N>
inline constexpr size_t array_size(const T(&)[N]) {
return N;
}
template <typename T>
inline T take(T &source) {
return std::exchange(source, T());
}
namespace internal { namespace internal {
template <typename D, typename T> template <typename D, typename T>

View File

@ -25,11 +25,14 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
namespace rpl { namespace rpl {
struct no_value { template <char Tag>
struct no_type {
no_type() = delete;
}; };
using no_value = no_type<'V'>;
using no_error = no_type<'E'>;
struct no_error { struct empty_value {
no_error() = delete;
}; };
template <typename Value, typename Error> template <typename Value, typename Error>
@ -47,11 +50,11 @@ public:
OnError &&error, OnError &&error,
OnDone &&done); OnDone &&done);
bool putNext(Value value) const; bool put_next(Value value) const;
void putError(Error error) const; void put_error(Error error) const;
void putDone() const; void put_done() const;
void setLifetime(lifetime &&lifetime) const; void set_lifetime(lifetime &&lifetime) const;
void terminate() const; void terminate() const;
bool operator==(const consumer &other) const { bool operator==(const consumer &other) const {
@ -93,11 +96,11 @@ private:
template <typename Value, typename Error> template <typename Value, typename Error>
class consumer<Value, Error>::abstract_consumer_instance { class consumer<Value, Error>::abstract_consumer_instance {
public: public:
virtual bool putNext(Value value) = 0; virtual bool put_next(Value value) = 0;
virtual void putError(Error error) = 0; virtual void put_error(Error error) = 0;
virtual void putDone() = 0; virtual void put_done() = 0;
void setLifetime(lifetime &&lifetime); void set_lifetime(lifetime &&lifetime);
void terminate(); void terminate();
protected: protected:
@ -122,9 +125,9 @@ public:
, _done(std::forward<OnDoneImpl>(done)) { , _done(std::forward<OnDoneImpl>(done)) {
} }
bool putNext(Value value) override; bool put_next(Value value) override;
void putError(Error error) override; void put_error(Error error) override;
void putDone() override; void put_done() override;
private: private:
OnNext _next; OnNext _next;
@ -167,9 +170,9 @@ consumer<Value, Error>::consumer(
} }
template <typename Value, typename Error> template <typename Value, typename Error>
bool consumer<Value, Error>::putNext(Value value) const { bool consumer<Value, Error>::put_next(Value value) const {
if (_instance) { if (_instance) {
if (_instance->putNext(std::move(value))) { if (_instance->put_next(std::move(value))) {
return true; return true;
} }
_instance = nullptr; _instance = nullptr;
@ -178,23 +181,23 @@ bool consumer<Value, Error>::putNext(Value value) const {
} }
template <typename Value, typename Error> template <typename Value, typename Error>
void consumer<Value, Error>::putError(Error error) const { void consumer<Value, Error>::put_error(Error error) const {
if (_instance) { if (_instance) {
std::exchange(_instance, nullptr)->putError(std::move(error)); std::exchange(_instance, nullptr)->put_error(std::move(error));
} }
} }
template <typename Value, typename Error> template <typename Value, typename Error>
void consumer<Value, Error>::putDone() const { void consumer<Value, Error>::put_done() const {
if (_instance) { if (_instance) {
std::exchange(_instance, nullptr)->putDone(); std::exchange(_instance, nullptr)->put_done();
} }
} }
template <typename Value, typename Error> template <typename Value, typename Error>
void consumer<Value, Error>::setLifetime(lifetime &&lifetime) const { void consumer<Value, Error>::set_lifetime(lifetime &&lifetime) const {
if (_instance) { if (_instance) {
_instance->setLifetime(std::move(lifetime)); _instance->set_lifetime(std::move(lifetime));
} else { } else {
lifetime.destroy(); lifetime.destroy();
} }
@ -208,7 +211,7 @@ void consumer<Value, Error>::terminate() const {
} }
template <typename Value, typename Error> template <typename Value, typename Error>
void consumer<Value, Error>::abstract_consumer_instance::setLifetime( void consumer<Value, Error>::abstract_consumer_instance::set_lifetime(
lifetime &&lifetime) { lifetime &&lifetime) {
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(_mutex);
if (_terminated) { if (_terminated) {
@ -234,7 +237,7 @@ void consumer<Value, Error>::abstract_consumer_instance::terminate() {
template <typename Value, typename Error> template <typename Value, typename Error>
template <typename OnNext, typename OnError, typename OnDone> template <typename OnNext, typename OnError, typename OnDone>
bool consumer<Value, Error>::consumer_instance<OnNext, OnError, OnDone>::putNext( bool consumer<Value, Error>::consumer_instance<OnNext, OnError, OnDone>::put_next(
Value value) { Value value) {
std::unique_lock<std::mutex> lock(this->_mutex); std::unique_lock<std::mutex> lock(this->_mutex);
if (this->_terminated) { if (this->_terminated) {
@ -249,7 +252,7 @@ bool consumer<Value, Error>::consumer_instance<OnNext, OnError, OnDone>::putNext
template <typename Value, typename Error> template <typename Value, typename Error>
template <typename OnNext, typename OnError, typename OnDone> template <typename OnNext, typename OnError, typename OnDone>
void consumer<Value, Error>::consumer_instance<OnNext, OnError, OnDone>::putError( void consumer<Value, Error>::consumer_instance<OnNext, OnError, OnDone>::put_error(
Error error) { Error error) {
std::unique_lock<std::mutex> lock(this->_mutex); std::unique_lock<std::mutex> lock(this->_mutex);
if (!this->_terminated) { if (!this->_terminated) {
@ -263,7 +266,7 @@ void consumer<Value, Error>::consumer_instance<OnNext, OnError, OnDone>::putErro
template <typename Value, typename Error> template <typename Value, typename Error>
template <typename OnNext, typename OnError, typename OnDone> template <typename OnNext, typename OnError, typename OnDone>
void consumer<Value, Error>::consumer_instance<OnNext, OnError, OnDone>::putDone() { void consumer<Value, Error>::consumer_instance<OnNext, OnError, OnDone>::put_done() {
std::unique_lock<std::mutex> lock(this->_mutex); std::unique_lock<std::mutex> lock(this->_mutex);
if (!this->_terminated) { if (!this->_terminated) {
auto handler = std::move(this->_done); auto handler = std::move(this->_done);

View File

@ -22,6 +22,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include "producer.h" #include "producer.h"
#include "base/algorithm.h" #include "base/algorithm.h"
#include "base/assertion.h"
namespace rpl { namespace rpl {
@ -29,53 +30,53 @@ template <typename Value>
class event_stream { class event_stream {
public: public:
event_stream(); event_stream();
event_stream(event_stream &&other);
void fire(Value value); void fire(Value value);
producer<Value, no_error> events(); producer<Value, no_error> events() const;
~event_stream(); ~event_stream();
private: private:
std::weak_ptr<event_stream*> weak() const { std::weak_ptr<std::vector<consumer<Value, no_error>>> weak() const {
return _strong; return _consumers;
}
void addConsumer(consumer<Value, no_error> &&consumer) {
_consumers.push_back(std::move(consumer));
}
void removeConsumer(const consumer<Value, no_error> &consumer) {
auto it = base::find(_consumers, consumer);
if (it != _consumers.end()) {
it->terminate();
}
} }
std::shared_ptr<event_stream*> _strong; std::shared_ptr<std::vector<consumer<Value, no_error>>> _consumers;
std::vector<consumer<Value, no_error>> _consumers;
}; };
template <typename Value> template <typename Value>
event_stream<Value>::event_stream() event_stream<Value>::event_stream()
: _strong(std::make_shared<event_stream*>(this)) { : _consumers(std::make_shared<std::vector<consumer<Value, no_error>>>()) {
}
template <typename Value>
event_stream<Value>::event_stream(event_stream &&other)
: _consumers(base::take(other._consumers)) {
} }
template <typename Value> template <typename Value>
void event_stream<Value>::fire(Value value) { void event_stream<Value>::fire(Value value) {
base::push_back_safe_remove_if(_consumers, [&](auto &consumer) { Expects(_consumers != nullptr);
return !consumer.putNext(value); base::push_back_safe_remove_if(*_consumers, [&](auto &consumer) {
return !consumer.put_next(value);
}); });
} }
template <typename Value> template <typename Value>
producer<Value, no_error> event_stream<Value>::events() { producer<Value, no_error> event_stream<Value>::events() const {
return producer<Value, no_error>([weak = weak()](consumer<Value, no_error> consumer) { return producer<Value, no_error>([weak = weak()](consumer<Value, no_error> consumer) {
if (auto strong = weak.lock()) { if (auto strong = weak.lock()) {
auto result = [weak, consumer] { auto result = [weak, consumer] {
if (auto strong = weak.lock()) { if (auto strong = weak.lock()) {
(*strong)->removeConsumer(consumer); auto it = base::find(*strong, consumer);
if (it != strong->end()) {
it->terminate();
}
} }
}; };
(*strong)->addConsumer(std::move(consumer)); strong->push_back(std::move(consumer));
return lifetime(std::move(result)); return lifetime(std::move(result));
} }
return lifetime(); return lifetime();
@ -84,8 +85,10 @@ producer<Value, no_error> event_stream<Value>::events() {
template <typename Value> template <typename Value>
event_stream<Value>::~event_stream() { event_stream<Value>::~event_stream() {
for (auto &consumer : _consumers) { if (_consumers) {
consumer.putDone(); for (auto &consumer : *_consumers) {
consumer.put_done();
}
} }
} }

View File

@ -22,7 +22,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include "base/lambda.h" #include "base/lambda.h"
#include "base/algorithm.h" #include "base/algorithm.h"
#include <functional> #include <deque>
namespace rpl { namespace rpl {
@ -33,45 +33,64 @@ public:
lifetime &operator=(lifetime &&other); lifetime &operator=(lifetime &&other);
template <typename Destroy, typename = decltype(std::declval<Destroy>()())> template <typename Destroy, typename = decltype(std::declval<Destroy>()())>
lifetime(Destroy &&destroy) : _destroy(std::forward<Destroy>(destroy)) { lifetime(Destroy &&destroy);
explicit operator bool() const { return !_callbacks.empty(); }
template <typename Destroy, typename = decltype(std::declval<Destroy>()())>
void add(Destroy &&destroy);
void add(lifetime &&other);
void destroy();
template <typename Type, typename... Args>
Type *make_state(Args&& ...args) {
auto result = new Type(std::forward<Args>(args)...);
add([result]() mutable {
static_assert(sizeof(Type) > 0, "Can't delete unknown type.");
delete base::take(result);
});
return result;
} }
void add(lifetime other) { ~lifetime() { destroy(); }
_nested.push_back(std::move(other));
}
void destroy() {
auto nested = std::exchange(_nested, std::vector<lifetime>());
auto callback = std::exchange(_destroy, base::lambda_once<void()>());
if (!nested.empty()) {
nested.clear();
}
if (callback) {
callback();
}
}
~lifetime() {
destroy();
}
private: private:
base::lambda_once<void()> _destroy; std::deque<base::lambda_once<void()>> _callbacks;
std::vector<lifetime> _nested;
}; };
lifetime::lifetime(lifetime &&other) inline lifetime::lifetime(lifetime &&other)
: _destroy(std::exchange(other._destroy, base::lambda_once<void()>())) : _callbacks(base::take(other._callbacks)) {
, _nested(std::exchange(other._nested, std::vector<lifetime>())) {
} }
lifetime &lifetime::operator=(lifetime &&other) { inline lifetime &lifetime::operator=(lifetime &&other) {
std::swap(_destroy, other._destroy); std::swap(_callbacks, other._callbacks);
std::swap(_nested, other._nested);
other.destroy(); other.destroy();
return *this; return *this;
} }
template <typename Destroy, typename>
inline lifetime::lifetime(Destroy &&destroy) {
_callbacks.emplace_back(std::forward<Destroy>(destroy));
}
template <typename Destroy, typename>
inline void lifetime::add(Destroy &&destroy) {
_callbacks.push_front(destroy);
}
inline void lifetime::add(lifetime &&other) {
auto callbacks = base::take(other._callbacks);
_callbacks.insert(
_callbacks.begin(),
std::make_move_iterator(callbacks.begin()),
std::make_move_iterator(callbacks.end()));
}
inline void lifetime::destroy() {
for (auto &callback : base::take(_callbacks)) {
callback();
}
}
} // namespace rpl } // namespace rpl

View File

@ -26,9 +26,12 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
namespace rpl { namespace rpl {
template <typename Value, typename Error> template <typename Value, typename Error = no_error>
class producer { class producer {
public: public:
using value_type = Value;
using error_type = Error;
template <typename Generator, typename = std::enable_if<std::is_convertible< template <typename Generator, typename = std::enable_if<std::is_convertible<
decltype(std::declval<Generator>()(std::declval<consumer<Value, Error>>())), decltype(std::declval<Generator>()(std::declval<consumer<Value, Error>>())),
lifetime lifetime
@ -45,7 +48,7 @@ public:
lifetime start( lifetime start(
OnNext &&next, OnNext &&next,
OnError &&error, OnError &&error,
OnDone &&done); OnDone &&done) const;
private: private:
base::lambda<lifetime(consumer<Value, Error>)> _generator; base::lambda<lifetime(consumer<Value, Error>)> _generator;
@ -69,13 +72,519 @@ template <
lifetime producer<Value, Error>::start( lifetime producer<Value, Error>::start(
OnNext &&next, OnNext &&next,
OnError &&error, OnError &&error,
OnDone &&done) { OnDone &&done) const {
auto result = consumer<Value, Error>( auto result = consumer<Value, Error>(
std::forward<OnNext>(next), std::forward<OnNext>(next),
std::forward<OnError>(error), std::forward<OnError>(error),
std::forward<OnDone>(done)); std::forward<OnDone>(done));
result.setLifetime(_generator(result)); result.set_lifetime(_generator(result));
return [result] { result.terminate(); }; return [result] { result.terminate(); };
} }
template <typename Value, typename Error>
inline producer<Value, Error> duplicate(const producer<Value, Error> &producer) {
return producer;
}
template <
typename Value,
typename Error,
typename Method,
typename = decltype(std::declval<Method>()(std::declval<producer<Value, Error>>()))>
inline decltype(auto) operator|(producer<Value, Error> &&producer, Method &&method) {
return std::forward<Method>(method)(std::move(producer));
}
template <typename OnNext>
inline decltype(auto) bind_on_next(OnNext &&handler) {
return [handler = std::forward<OnNext>(handler)](auto &&existing) mutable {
using value_type = typename std::decay_t<decltype(existing)>::value_type;
using error_type = typename std::decay_t<decltype(existing)>::error_type;
return producer<no_value, error_type>([
existing = std::move(existing),
handler = std::forward<OnNext>(handler)
](consumer<no_value, error_type> consumer) {
return existing.start([handler = std::decay_t<OnNext>(handler)](
value_type value) {
handler(value);
}, [consumer](error_type error) {
consumer.put_error(error);
}, [consumer] {
consumer.put_done();
});
});
};
}
template <typename OnError>
inline decltype(auto) bind_on_error(OnError &&handler) {
return [handler = std::forward<OnError>(handler)](auto &&existing) mutable {
using value_type = typename std::decay_t<decltype(existing)>::value_type;
using error_type = typename std::decay_t<decltype(existing)>::error_type;
return producer<value_type, no_error>([
existing = std::move(existing),
handler = std::forward<OnError>(handler)
](consumer<value_type, no_error> consumer) {
return existing.start([consumer](value_type value) {
consumer.put_next(value);
}, [handler = std::decay_t<OnError>(handler)](error_type value) {
handler(value);
}, [consumer] {
consumer.put_done();
});
});
};
}
template <typename OnDone>
inline decltype(auto) bind_on_done(OnDone &&handler) {
return [handler = std::forward<OnDone>(handler)](auto &&existing) mutable {
using value_type = typename std::decay_t<decltype(existing)>::value_type;
using error_type = typename std::decay_t<decltype(existing)>::error_type;
return producer<value_type, error_type>([
existing = std::move(existing),
handler = std::forward<OnDone>(handler)
](consumer<value_type, error_type> consumer) {
return existing.start([consumer](value_type value) {
consumer.put_next(value);
}, [consumer](error_type value) {
consumer.put_error(value);
}, [handler = std::decay_t<OnDone>(handler)] {
handler();
});
});
};
}
namespace details {
template <typename OnNext>
struct next_holder {
OnNext next;
};
template <typename OnError>
struct error_holder {
OnError error;
};
template <typename OnDone>
struct done_holder {
OnDone done;
};
template <
typename Value,
typename Error,
typename OnNext>
struct producer_with_next {
producer<Value, Error> producer;
OnNext next;
};
template <
typename Value,
typename Error,
typename OnError>
struct producer_with_error {
producer<Value, Error> producer;
OnError error;
};
template <
typename Value,
typename Error,
typename OnDone>
struct producer_with_done {
producer<Value, Error> producer;
OnDone done;
};
template <
typename Value,
typename Error,
typename OnNext,
typename OnError>
struct producer_with_next_error {
producer<Value, Error> producer;
OnNext next;
OnError error;
};
template <
typename Value,
typename Error,
typename OnNext,
typename OnDone>
struct producer_with_next_done {
producer<Value, Error> producer;
OnNext next;
OnDone done;
};
template <
typename Value,
typename Error,
typename OnError,
typename OnDone>
struct producer_with_error_done {
producer<Value, Error> producer;
OnError error;
OnDone done;
};
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone>
struct producer_with_next_error_done {
producer<Value, Error> producer;
OnNext next;
OnError error;
OnDone done;
};
struct lifetime_holder {
lifetime &alive_while;
};
} // namespace details
template <typename OnNext>
inline details::next_holder<std::decay_t<OnNext>> on_next(OnNext &&handler) {
return { std::forward<OnNext>(handler) };
}
template <typename OnError>
inline details::error_holder<std::decay_t<OnError>> on_error(OnError &&handler) {
return { std::forward<OnError>(handler) };
}
template <typename OnDone>
inline details::done_holder<std::decay_t<OnDone>> on_done(OnDone &&handler) {
return { std::forward<OnDone>(handler) };
}
inline details::lifetime_holder start(lifetime &alive_while) {
return { alive_while };
}
namespace details {
template <
typename Value,
typename Error,
typename OnNext,
typename = decltype(std::declval<OnNext>()(std::declval<Value>()))>
inline producer_with_next<Value, Error, OnNext> operator|(
producer<Value, Error> &&producer,
next_holder<OnNext> &&handler) {
return { std::move(producer), std::move(handler.next) };
}
template <
typename Value,
typename Error,
typename OnError,
typename = decltype(std::declval<OnError>()(std::declval<Error>()))>
inline producer_with_error<Value, Error, OnError> operator|(
producer<Value, Error> &&producer,
error_holder<OnError> &&handler) {
return { std::move(producer), std::move(handler.error) };
}
template <
typename Value,
typename Error,
typename OnDone,
typename = decltype(std::declval<OnDone>()())>
inline producer_with_done<Value, Error, OnDone> operator|(
producer<Value, Error> &&producer,
done_holder<OnDone> &&handler) {
return { std::move(producer), std::move(handler.done) };
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename = decltype(std::declval<OnNext>()(std::declval<Value>())),
typename = decltype(std::declval<OnError>()(std::declval<Error>()))>
inline producer_with_next_error<Value, Error, OnNext, OnError> operator|(
producer_with_next<Value, Error, OnNext> &&producer_with_next,
error_holder<OnError> &&handler) {
return {
std::move(producer_with_next.producer),
std::move(producer_with_next.next),
std::move(handler.error) };
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename = decltype(std::declval<OnNext>()(std::declval<Value>())),
typename = decltype(std::declval<OnError>()(std::declval<Error>()))>
inline producer_with_next_error<Value, Error, OnNext, OnError> operator|(
producer_with_error<Value, Error, OnError> &&producer_with_error,
next_holder<OnNext> &&handler) {
return {
std::move(producer_with_error.producer),
std::move(handler.next),
std::move(producer_with_error.error) };
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnDone,
typename = decltype(std::declval<OnNext>()(std::declval<Value>())),
typename = decltype(std::declval<OnDone>()())>
inline producer_with_next_done<Value, Error, OnNext, OnDone> operator|(
producer_with_next<Value, Error, OnNext> &&producer_with_next,
done_holder<OnDone> &&handler) {
return {
std::move(producer_with_next.producer),
std::move(producer_with_next.next),
std::move(handler.done) };
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnDone,
typename = decltype(std::declval<OnNext>()(std::declval<Value>())),
typename = decltype(std::declval<OnDone>()())>
inline producer_with_next_done<Value, Error, OnNext, OnDone> operator|(
producer_with_done<Value, Error, OnDone> &&producer_with_done,
next_holder<OnNext> &&handler) {
return {
std::move(producer_with_done.producer),
std::move(handler.next),
std::move(producer_with_done.done) };
}
template <
typename Value,
typename Error,
typename OnError,
typename OnDone,
typename = decltype(std::declval<OnError>()(std::declval<Error>())),
typename = decltype(std::declval<OnDone>()())>
inline producer_with_error_done<Value, Error, OnError, OnDone> operator|(
producer_with_error<Value, Error, OnError> &&producer_with_error,
done_holder<OnDone> &&handler) {
return {
std::move(producer_with_error.producer),
std::move(producer_with_error.error),
std::move(handler.done) };
}
template <
typename Value,
typename Error,
typename OnError,
typename OnDone,
typename = decltype(std::declval<OnError>()(std::declval<Error>())),
typename = decltype(std::declval<OnDone>()())>
inline producer_with_error_done<Value, Error, OnError, OnDone> operator|(
producer_with_done<Value, Error, OnDone> &&producer_with_done,
error_holder<OnError> &&handler) {
return {
std::move(producer_with_done.producer),
std::move(handler.error),
std::move(producer_with_done.done) };
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone,
typename = decltype(std::declval<OnNext>()(std::declval<Value>())),
typename = decltype(std::declval<OnError>()(std::declval<Error>())),
typename = decltype(std::declval<OnDone>()())>
inline producer_with_next_error_done<
Value,
Error,
OnNext,
OnError,
OnDone> operator|(
producer_with_next_error<
Value,
Error,
OnNext,
OnError> &&producer_with_next_error,
done_holder<OnDone> &&handler) {
return {
std::move(producer_with_next_error.producer),
std::move(producer_with_next_error.next),
std::move(producer_with_next_error.error),
std::move(handler.done) };
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone,
typename = decltype(std::declval<OnNext>()(std::declval<Value>())),
typename = decltype(std::declval<OnError>()(std::declval<Error>())),
typename = decltype(std::declval<OnDone>()())>
inline producer_with_next_error_done<
Value,
Error,
OnNext,
OnError,
OnDone> operator|(
producer_with_next_done<
Value,
Error,
OnNext,
OnDone> &&producer_with_next_done,
error_holder<OnError> &&handler) {
return {
std::move(producer_with_next_done.producer),
std::move(producer_with_next_done.next),
std::move(handler.error),
std::move(producer_with_next_done.done) };
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone,
typename = decltype(std::declval<OnNext>()(std::declval<Value>())),
typename = decltype(std::declval<OnError>()(std::declval<Error>())),
typename = decltype(std::declval<OnDone>()())>
inline producer_with_next_error_done<
Value,
Error,
OnNext,
OnError,
OnDone> operator|(
producer_with_error_done<
Value,
Error,
OnError,
OnDone> &&producer_with_error_done,
next_holder<OnNext> &&handler) {
return {
std::move(producer_with_error_done.producer),
std::move(handler.next),
std::move(producer_with_error_done.error),
std::move(producer_with_error_done.done) };
}
template <
typename Value,
typename Error,
typename OnNext,
typename OnError,
typename OnDone>
inline void operator|(
producer_with_next_error_done<
Value,
Error,
OnNext,
OnError,
OnDone> &&producer_with_next_error_done,
lifetime_holder &&lifetime) {
lifetime.alive_while.add(producer_with_next_error_done.producer.start(
std::move(producer_with_next_error_done.next),
std::move(producer_with_next_error_done.error),
std::move(producer_with_next_error_done.done)));
}
template <typename Value, typename Error>
inline void operator|(
producer<Value, Error> &&producer,
lifetime_holder &&start_with_lifetime) {
return std::move(producer)
| on_next([](Value) {})
| on_error([](Error) {})
| on_done([] {})
| std::move(start_with_lifetime);
}
template <typename Value, typename Error, typename OnNext>
inline void operator|(
producer_with_next<Value, Error, OnNext> &&producer_with_next,
lifetime_holder &&start_with_lifetime) {
return std::move(producer_with_next)
| on_error([](Error) {})
| on_done([] {})
| std::move(start_with_lifetime);
}
template <typename Value, typename Error, typename OnError>
inline void operator|(
producer_with_error<Value, Error, OnError> &&producer_with_error,
lifetime_holder &&start_with_lifetime) {
return std::move(producer_with_error)
| on_next([](Value) {})
| on_done([] {})
| std::move(start_with_lifetime);
}
template <typename Value, typename Error, typename OnDone>
inline void operator|(
producer_with_done<Value, Error, OnDone> &&producer_with_done,
lifetime_holder &&start_with_lifetime) {
return std::move(producer_with_done)
| on_next([](Value) {})
| on_error([](Error) {})
| std::move(start_with_lifetime);
}
template <typename Value, typename Error, typename OnNext, typename OnError>
inline void operator|(
producer_with_next_error<
Value,
Error,
OnNext,
OnError> &&producer_with_next_error,
lifetime_holder &&start_with_lifetime) {
return std::move(producer_with_next_error)
| on_done([] {})
| std::move(start_with_lifetime);
}
template <typename Value, typename Error, typename OnNext, typename OnDone>
inline void operator|(
producer_with_next_done<
Value,
Error,
OnNext,
OnDone> &&producer_with_next_done,
lifetime_holder &&start_with_lifetime) {
return std::move(producer_with_next_done)
| on_error([](Error) {})
| std::move(start_with_lifetime);
}
template <typename Value, typename Error, typename OnError, typename OnDone>
inline void operator|(
producer_with_error_done<
Value,
Error,
OnError,
OnDone> &&producer_with_error_done,
lifetime_holder &&start_with_lifetime) {
return std::move(producer_with_error_done)
| on_next([](Value) {})
| std::move(start_with_lifetime);
}
} // namespace details
} // namespace rpl } // namespace rpl

View File

@ -27,7 +27,8 @@ using namespace rpl;
class OnDestructor { class OnDestructor {
public: public:
OnDestructor(base::lambda_once<void()> callback) : _callback(std::move(callback)) { OnDestructor(base::lambda_once<void()> callback)
: _callback(std::move(callback)) {
} }
~OnDestructor() { ~OnDestructor() {
if (_callback) { if (_callback) {
@ -52,22 +53,22 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
}); });
{ {
producer<int, no_error>([=](auto consumer) { producer<int, no_error>([=](auto consumer) {
destroyCaller; (void)destroyCaller;
consumer.putNext(1); consumer.put_next(1);
consumer.putNext(2); consumer.put_next(2);
consumer.putNext(3); consumer.put_next(3);
consumer.putDone(); consumer.put_done();
return [=] { return [=] {
destroyCaller; (void)destroyCaller;
*lifetimeEnded = true; *lifetimeEnded = true;
}; };
}).start([=](int value) { }).start([=](int value) {
destroyCaller; (void)destroyCaller;
*sum += value; *sum += value;
}, [=](no_error) { }, [=](no_error) {
destroyCaller; (void)destroyCaller;
}, [=]() { }, [=]() {
destroyCaller; (void)destroyCaller;
*doneGenerated = true; *doneGenerated = true;
}); });
} }
@ -82,7 +83,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
auto errorGenerated = std::make_shared<bool>(false); auto errorGenerated = std::make_shared<bool>(false);
{ {
producer<no_value, bool>([=](auto consumer) { producer<no_value, bool>([=](auto consumer) {
consumer.putError(true); consumer.put_error(true);
return lifetime(); return lifetime();
}).start([=](no_value) { }).start([=](no_value) {
}, [=](bool error) { }, [=](bool error) {
@ -124,9 +125,9 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
{ {
saved = producer<int, no_error>([=](auto consumer) { saved = producer<int, no_error>([=](auto consumer) {
auto inner = producer<int, no_error>([=](auto consumer) { auto inner = producer<int, no_error>([=](auto consumer) {
consumer.putNext(1); consumer.put_next(1);
consumer.putNext(2); consumer.put_next(2);
consumer.putNext(3); consumer.put_next(3);
return [=] { return [=] {
++*lifetimeEndCount; ++*lifetimeEndCount;
}; };
@ -135,12 +136,12 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
++*lifetimeEndCount; ++*lifetimeEndCount;
}); });
result.add(inner.start([=](int value) { result.add(inner.start([=](int value) {
consumer.putNext(value); consumer.put_next(value);
}, [=](no_error) { }, [=](no_error) {
}, [=] { }, [=] {
})); }));
result.add(inner.start([=](int value) { result.add(inner.start([=](int value) {
consumer.putNext(value); consumer.put_next(value);
}, [=](no_error) { }, [=](no_error) {
}, [=] { }, [=] {
})); }));
@ -159,7 +160,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
SECTION("event_stream basic test") { SECTION("event_stream basic test") {
auto sum = std::make_shared<int>(0); auto sum = std::make_shared<int>(0);
rpl::event_stream<int> stream; event_stream<int> stream;
stream.fire(1); stream.fire(1);
stream.fire(2); stream.fire(2);
stream.fire(3); stream.fire(3);
@ -182,7 +183,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
SECTION("event_stream add in handler test") { SECTION("event_stream add in handler test") {
auto sum = std::make_shared<int>(0); auto sum = std::make_shared<int>(0);
rpl::event_stream<int> stream; event_stream<int> stream;
{ {
auto composite = lifetime(); auto composite = lifetime();
@ -233,7 +234,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
SECTION("event_stream add and remove in handler test") { SECTION("event_stream add and remove in handler test") {
auto sum = std::make_shared<int>(0); auto sum = std::make_shared<int>(0);
rpl::event_stream<int> stream; event_stream<int> stream;
{ {
auto composite = lifetime(); auto composite = lifetime();
@ -286,7 +287,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
auto sum = std::make_shared<int>(0); auto sum = std::make_shared<int>(0);
lifetime extended; lifetime extended;
{ {
rpl::event_stream<int> stream; event_stream<int> stream;
extended = stream.events().start([=](int value) { extended = stream.events().start([=](int value) {
*sum += value; *sum += value;
}, [=](no_error) { }, [=](no_error) {
@ -298,5 +299,150 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
} }
REQUIRE(*sum == 1 + 2 + 3); REQUIRE(*sum == 1 + 2 + 3);
} }
SECTION("event_stream move test") {
auto sum = std::make_shared<int>(0);
lifetime extended;
{
event_stream<int> stream;
stream.events() | on_next([=](int value) {
*sum += value;
}) | start(extended);
stream.fire(1);
stream.fire(2);
auto movedStream = std::move(stream);
movedStream.fire(3);
movedStream.fire(4);
}
REQUIRE(*sum == 1 + 2 + 3 + 4);
}
} }
TEST_CASE("basic piping tests", "[rpl::producer]") {
SECTION("bind_on_next, bind_on_error, bind_on_done") {
auto sum = std::make_shared<int>(0);
auto doneGenerated = std::make_shared<bool>(false);
{
auto alive = lifetime();
producer<int, no_error>([=](auto consumer) {
consumer.put_next(1);
consumer.put_next(2);
consumer.put_next(3);
consumer.put_done();
return lifetime();
}) | bind_on_next([=](int value) {
*sum += value;
}) | bind_on_done([=]() {
*doneGenerated = true;
}) | start(alive);
producer<no_value, int>([=](auto consumer) {
consumer.put_error(4);
return lifetime();
}) | bind_on_error([=](int value) {
*sum += value;
}) | bind_on_done([=]() {
*doneGenerated = false;
}) | start(alive);
}
REQUIRE(*sum == 1 + 2 + 3 + 4);
REQUIRE(*doneGenerated);
}
SECTION("on_next, on_error, on_done") {
auto sum = std::make_shared<int>(0);
auto dones = std::make_shared<int>(0);
{
auto alive = lifetime();
producer<int, int>([=](auto consumer) {
consumer.put_next(1);
consumer.put_done();
return lifetime();
}) | on_next([=](int value) {
*sum += value;
}) | start(alive);
producer<int, int>([=](auto consumer) {
consumer.put_next(11);
consumer.put_error(111);
return lifetime();
}) | on_error([=](int value) {
*sum += value;
}) | start(alive);
producer<int, int>([=](auto consumer) {
consumer.put_next(1111);
consumer.put_done();
return lifetime();
}) | on_done([=]() {
*dones += 1;
}) | start(alive);
producer<int, int>([=](auto consumer) {
consumer.put_next(11111);
consumer.put_next(11112);
consumer.put_next(11113);
consumer.put_error(11114);
return lifetime();
}) | on_next([=](int value) {
*sum += value;
}) | on_error([=](int value) {
*sum += value;
}) | start(alive);
}
auto alive = lifetime();
producer<int, int>([=](auto consumer) {
consumer.put_next(111111);
consumer.put_next(111112);
consumer.put_next(111113);
consumer.put_done();
return lifetime();
}) | on_next([=](int value) {
*sum += value;
}) | on_done([=]() {
*dones += 11;
}) | start(alive);
producer<int, int>([=](auto consumer) {
consumer.put_error(1111111);
return lifetime();
}) | on_error([=](int value) {
*sum += value;
}) | on_done([=]() {
*dones = 0;
}) | start(alive);
REQUIRE(*sum ==
1 +
111 +
11111 + 11112 + 11113 + 11114 +
111111 + 111112 + 111113 +
1111111);
REQUIRE(*dones == 1 + 11);
}
SECTION("on_next should copy its callback") {
auto sum = std::make_shared<int>(0);
{
auto next = [=](int value) {
REQUIRE(sum != nullptr);
*sum += value;
};
for (int i = 0; i != 3; ++i) {
auto alive = lifetime();
producer<int, int>([=](auto consumer) {
consumer.put_next(1);
consumer.put_done();
return lifetime();
})
| on_next(next)
| start(alive);
}
}
REQUIRE(*sum == 3);
}
}