Add rpl::merge().

This commit is contained in:
John Preston 2017-10-20 19:18:26 +03:00
parent 54cc3e6315
commit 7b69282c7e
4 changed files with 176 additions and 13 deletions
Telegram
SourceFiles/rpl
gyp/tests

View File

@ -82,8 +82,9 @@ public:
}
}
}
}, [consumer = _consumer](Error &&error) {
consumer.put_error(std::move(error));
}, [consumer = _consumer](auto &&error) {
consumer.put_error_forward(
std::forward<decltype(error)>(error));
}, [consumer = _consumer, state = _state] {
if (!--state->working) {
consumer.put_done();
@ -109,11 +110,7 @@ inline void combine_subscribe(
std::index_sequence<I...>,
std::tuple<producer<Values, Errors, Generators>...> &&saved) {
auto consume = { (
details::combine_subscribe_one<
I,
consumer_type,
Values...
>(
combine_subscribe_one<I, consumer_type, Values...>(
consumer,
state
).subscribe(std::get<I>(std::move(saved))), 0)... };
@ -137,7 +134,7 @@ template <
class combine_implementation_helper<producer<Values, Errors, Generators>...> {
public:
using CombinedValue = std::tuple<Values...>;
using CombinedError = details::normalized_variant_t<Errors...>;
using CombinedError = normalized_variant_t<Errors...>;
combine_implementation_helper(
producer<Values, Errors, Generators> &&...producers)
@ -147,9 +144,9 @@ public:
template <typename Handlers>
lifetime operator()(const consumer<CombinedValue, CombinedError, Handlers> &consumer) {
auto state = consumer.template make_state<
details::combine_state<Values...>>();
combine_state<Values...>>();
constexpr auto kArity = sizeof...(Values);
details::combine_subscribe(
combine_subscribe(
consumer,
state,
std::make_index_sequence<kArity>(),
@ -170,7 +167,7 @@ template <
inline auto combine_implementation(
producer<Values, Errors, Generators> &&...producers) {
using CombinedValue = std::tuple<Values...>;
using CombinedError = details::normalized_variant_t<Errors...>;
using CombinedError = normalized_variant_t<Errors...>;
return make_producer<CombinedValue, CombinedError>(
make_combine_implementation_helper(std::move(producers)...));
@ -333,8 +330,9 @@ inline auto combine(
consumer.put_next_copy(state->latest);
}
}
}, [consumer](Error &&error) {
consumer.put_error(std::move(error));
}, [consumer](auto &&error) {
consumer.put_error_forward(
std::forward<decltype(error)>(error));
}, [consumer, state] {
if (!--state->working) {
consumer.put_done();

View File

@ -0,0 +1,163 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include <rpl/producer.h>
#include <xutility>
namespace rpl {
namespace details {
struct merge_state {
merge_state(int working) : working(working) {
}
int working = 0;
};
template <size_t Index, typename consumer_type>
class merge_subscribe_one {
public:
merge_subscribe_one(
const consumer_type &consumer,
merge_state *state)
: _consumer(consumer)
, _state(state) {
}
template <typename Value, typename Error, typename Generator>
void subscribe(producer<Value, Error, Generator> &&producer) {
_consumer.add_lifetime(std::move(producer).start(
[consumer = _consumer](auto &&value) {
consumer.put_next_forward(
std::forward<decltype(value)>(value));
}, [consumer = _consumer](auto &&error) {
consumer.put_error_forward(
std::forward<decltype(error)>(error));
}, [consumer = _consumer, state = _state] {
if (!--state->working) {
consumer.put_done();
}
}));
}
private:
const consumer_type &_consumer;
merge_state *_state = nullptr;
};
template <
typename consumer_type,
typename Value,
typename Error,
typename ...Generators,
std::size_t ...I>
inline void merge_subscribe(
const consumer_type &consumer,
merge_state *state,
std::index_sequence<I...>,
std::tuple<producer<Value, Error, Generators>...> &&saved) {
auto consume = { (
details::merge_subscribe_one<I, consumer_type>(
consumer,
state
).subscribe(std::get<I>(std::move(saved))), 0)... };
(void)consume;
}
template <typename ...Producers>
class merge_implementation_helper;
template <typename ...Producers>
merge_implementation_helper<std::decay_t<Producers>...>
make_merge_implementation_helper(Producers &&...producers) {
return merge_implementation_helper<std::decay_t<Producers>...>(
std::forward<Producers>(producers)...);
}
template <
typename Value,
typename Error,
typename ...Generators>
class merge_implementation_helper<producer<Value, Error, Generators>...> {
public:
merge_implementation_helper(
producer<Value, Error, Generators> &&...producers)
: _saved(std::make_tuple(std::move(producers)...)) {
}
template <typename Handlers>
lifetime operator()(const consumer<Value, Error, Handlers> &consumer) {
auto state = consumer.template make_state<
details::merge_state>(sizeof...(Generators));
constexpr auto kArity = sizeof...(Generators);
details::merge_subscribe(
consumer,
state,
std::make_index_sequence<kArity>(),
std::move(_saved));
return lifetime();
}
private:
std::tuple<producer<Value, Error, Generators>...> _saved;
};
template <
typename Value,
typename Error,
typename ...Generators>
inline auto merge_implementation(
producer<Value, Error, Generators> &&...producers) {
return make_producer<Value, Error>(
make_merge_implementation_helper(std::move(producers)...));
}
template <typename ...Args>
struct merge_producers : std::false_type {
};
template <typename ...Args>
constexpr bool merge_producers_v
= merge_producers<Args...>::value;
template <
typename Value,
typename Error,
typename ...Generators>
struct merge_producers<
producer<Value, Error, Generators>...>
: std::true_type {
};
} // namespace details
template <
typename ...Args,
typename = std::enable_if_t<
details::merge_producers_v<Args...>>>
inline decltype(auto) merge(Args &&...args) {
return details::merge_implementation(std::move(args)...);
}
} // namespace rpl

View File

@ -35,6 +35,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include <rpl/deferred.h>
#include <rpl/map.h>
#include <rpl/mappers.h>
#include <rpl/merge.h>
#include <rpl/filter.h>
#include <rpl/distinct_until_changed.h>
#include <rpl/type_erased.h>

View File

@ -115,6 +115,7 @@
'<(src_loc)/rpl/lifetime.h',
'<(src_loc)/rpl/map.h',
'<(src_loc)/rpl/mappers.h',
'<(src_loc)/rpl/merge.h',
'<(src_loc)/rpl/never.h',
'<(src_loc)/rpl/operators_tests.cpp',
'<(src_loc)/rpl/producer.h',