diff --git a/Telegram/SourceFiles/rpl/before_next.h b/Telegram/SourceFiles/rpl/before_next.h
new file mode 100644
index 000000000..f401631ea
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/before_next.h
@@ -0,0 +1,37 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/producer.h>
+#include <rpl/filter.h>
+
+namespace rpl {
+
+template <typename SideEffect>
+auto before_next(SideEffect &&method) {
+	return filter([method = std::forward<SideEffect>(method)](
+			const auto &value) {
+		method(value);
+		return true;
+	});
+}
+
+} // namespace rpl
diff --git a/Telegram/SourceFiles/rpl/complete.h b/Telegram/SourceFiles/rpl/complete.h
new file mode 100644
index 000000000..18675590d
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/complete.h
@@ -0,0 +1,35 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/producer.h>
+
+namespace rpl {
+
+template <typename Value = empty_value, typename Error = no_error>
+producer<Value, Error> complete() {
+	return [](const consumer<Value, Error> &consumer) mutable {
+		consumer.put_done();
+		return lifetime();
+	};
+}
+
+} // namespace rpl
diff --git a/Telegram/SourceFiles/rpl/consumer.h b/Telegram/SourceFiles/rpl/consumer.h
index 17aa4caef..38366a5da 100644
--- a/Telegram/SourceFiles/rpl/consumer.h
+++ b/Telegram/SourceFiles/rpl/consumer.h
@@ -20,8 +20,9 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
 */
 #pragma once
 
-#include "rpl/lifetime.h"
 #include <mutex>
+#include <gsl/gsl_assert>
+#include <rpl/lifetime.h>
 
 namespace rpl {
 
@@ -36,7 +37,10 @@ struct no_error {
 struct empty_value {
 };
 
-template <typename Value, typename Error>
+struct empty_error {
+};
+
+template <typename Value = empty_value, typename Error = no_error>
 class consumer {
 public:
 	template <
@@ -57,7 +61,11 @@ public:
 	void put_error_copy(const Error &error) const;
 	void put_done() const;
 
-	void set_lifetime(lifetime &&lifetime) const;
+	void add_lifetime(lifetime &&lifetime) const;
+
+	template <typename Type, typename... Args>
+	Type *make_state(Args&& ...args) const;
+	
 	void terminate() const;
 
 	bool operator==(const consumer &other) const {
@@ -103,7 +111,11 @@ public:
 	virtual void put_error(Error &&error) = 0;
 	virtual void put_done() = 0;
 
-	void set_lifetime(lifetime &&lifetime);
+	void add_lifetime(lifetime &&lifetime);
+
+	template <typename Type, typename... Args>
+	Type *make_state(Args&& ...args);
+
 	void terminate();
 
 protected:
@@ -210,14 +222,21 @@ void consumer<Value, Error>::put_done() const {
 }
 
 template <typename Value, typename Error>
-void consumer<Value, Error>::set_lifetime(lifetime &&lifetime) const {
+void consumer<Value, Error>::add_lifetime(lifetime &&lifetime) const {
 	if (_instance) {
-		_instance->set_lifetime(std::move(lifetime));
+		_instance->add_lifetime(std::move(lifetime));
 	} else {
 		lifetime.destroy();
 	}
 }
 
+template <typename Value, typename Error>
+template <typename Type, typename... Args>
+Type *consumer<Value, Error>::make_state(Args&& ...args) const {
+	Expects(_instance != nullptr);
+	return _instance->template make_state<Type>(std::forward<Args>(args)...);
+}
+
 template <typename Value, typename Error>
 void consumer<Value, Error>::terminate() const {
 	if (_instance) {
@@ -226,7 +245,7 @@ void consumer<Value, Error>::terminate() const {
 }
 
 template <typename Value, typename Error>
-void consumer<Value, Error>::abstract_consumer_instance::set_lifetime(
+void consumer<Value, Error>::abstract_consumer_instance::add_lifetime(
 		lifetime &&lifetime) {
 	std::unique_lock<std::mutex> lock(_mutex);
 	if (_terminated) {
@@ -234,10 +253,19 @@ void consumer<Value, Error>::abstract_consumer_instance::set_lifetime(
 
 		lifetime.destroy();
 	} else {
-		_lifetime = std::move(lifetime);
+		_lifetime.add(std::move(lifetime));
 	}
 }
 
+template <typename Value, typename Error>
+template <typename Type, typename... Args>
+Type *consumer<Value, Error>::abstract_consumer_instance::make_state(
+		Args&& ...args) {
+	std::unique_lock<std::mutex> lock(_mutex);
+	Expects(!_terminated);
+	return _lifetime.template make_state<Type>(std::forward<Args>(args)...);
+}
+
 template <typename Value, typename Error>
 void consumer<Value, Error>::abstract_consumer_instance::terminate() {
 	std::unique_lock<std::mutex> lock(_mutex);
diff --git a/Telegram/SourceFiles/rpl/deferred.h b/Telegram/SourceFiles/rpl/deferred.h
new file mode 100644
index 000000000..eefff2162
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/deferred.h
@@ -0,0 +1,38 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/producer.h>
+
+namespace rpl {
+
+template <
+	typename Creator,
+	typename Value = typename decltype(std::declval<Creator>()())::value_type,
+	typename Error = typename decltype(std::declval<Creator>()())::error_type>
+producer<Value, Error> deferred(Creator &&creator) {
+	return [creator = std::forward<Creator>(creator)](
+			const consumer<Value, Error> &consumer) mutable {
+		return std::move(creator)().start_existing(consumer);
+	};
+}
+
+} // namespace rpl
diff --git a/Telegram/SourceFiles/rpl/distinct_until_changed.h b/Telegram/SourceFiles/rpl/distinct_until_changed.h
new file mode 100644
index 000000000..10b774eae
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/distinct_until_changed.h
@@ -0,0 +1,62 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/producer.h>
+#include "base/optional.h"
+
+namespace rpl {
+namespace details {
+
+class distinct_until_changed_helper {
+public:
+	template <typename Value, typename Error>
+	rpl::producer<Value, Error> operator()(
+			rpl::producer<Value, Error> &&initial) const {
+		return [initial = std::move(initial)](
+				const consumer<Value, Error> &consumer) mutable {
+			auto previous = consumer.make_state<
+				base::optional<Value>
+			>();
+			return std::move(initial).start(
+				[consumer, previous](Value &&value) {
+					if (!(*previous) || (**previous) != value) {
+						*previous = value;
+						consumer.put_next(std::move(value));
+					}
+				}, [consumer](Error &&error) {
+					consumer.put_error(std::move(error));
+				}, [consumer] {
+					consumer.put_done();
+				});
+		};
+	}
+
+};
+
+} // namespace details
+
+inline auto distinct_until_changed()
+-> details::distinct_until_changed_helper {
+	return details::distinct_until_changed_helper();
+}
+
+} // namespace rpl
diff --git a/Telegram/SourceFiles/rpl/event_stream.h b/Telegram/SourceFiles/rpl/event_stream.h
index bceb7750c..0d17925f7 100644
--- a/Telegram/SourceFiles/rpl/event_stream.h
+++ b/Telegram/SourceFiles/rpl/event_stream.h
@@ -20,36 +20,50 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
 */
 #pragma once
 
-#include "producer.h"
+#include <rpl/producer.h>
+#include <rpl/single.h>
+#include <rpl/then.h>
 #include "base/algorithm.h"
 #include "base/assertion.h"
 #include "base/index_based_iterator.h"
 
 namespace rpl {
 
-template <typename Value>
+// Currently not thread-safe :(
+
+template <typename Value = empty_value>
 class event_stream {
 public:
 	event_stream();
 	event_stream(event_stream &&other);
 
 	void fire(Value &&value);
+	void fire_copy(const Value &value) {
+		auto copy = value;
+		fire(std::move(copy));
+	}
 	producer<Value, no_error> events() const;
+	producer<Value, no_error> events_starting_with(
+			Value &&value) const {
+		return single(std::move(value)) | then(events());
+	}
+	producer<Value, no_error> events_starting_with_copy(
+			const Value &value) const {
+		auto copy = value;
+		return events_starting_with(std::move(copy));
+	}
 
 	~event_stream();
 
 private:
-	std::weak_ptr<std::vector<consumer<Value, no_error>>> weak() const {
-		return _consumers;
-	}
+	std::weak_ptr<std::vector<consumer<Value, no_error>>> weak() const;
 
-	std::shared_ptr<std::vector<consumer<Value, no_error>>> _consumers;
+	mutable std::shared_ptr<std::vector<consumer<Value, no_error>>> _consumers;
 
 };
 
 template <typename Value>
-event_stream<Value>::event_stream()
-	: _consumers(std::make_shared<std::vector<consumer<Value, no_error>>>()) {
+event_stream<Value>::event_stream() {
 }
 
 template <typename Value>
@@ -59,7 +73,10 @@ event_stream<Value>::event_stream(event_stream &&other)
 
 template <typename Value>
 void event_stream<Value>::fire(Value &&value) {
-	Expects(_consumers != nullptr);
+	if (!_consumers) {
+		return;
+	}
+
 	auto &consumers = *_consumers;
 	auto begin = base::index_based_begin(consumers);
 	auto end = base::index_based_end(consumers);
@@ -97,7 +114,8 @@ void event_stream<Value>::fire(Value &&value) {
 
 template <typename Value>
 producer<Value, no_error> event_stream<Value>::events() const {
-	return producer<Value, no_error>([weak = weak()](consumer<Value, no_error> consumer) {
+	return producer<Value, no_error>([weak = weak()](
+			const consumer<Value, no_error> &consumer) {
 		if (auto strong = weak.lock()) {
 			auto result = [weak, consumer] {
 				if (auto strong = weak.lock()) {
@@ -114,6 +132,15 @@ producer<Value, no_error> event_stream<Value>::events() const {
 	});
 }
 
+template <typename Value>
+std::weak_ptr<std::vector<consumer<Value, no_error>>> event_stream<Value>::weak() const {
+	if (!_consumers) {
+		_consumers = std::make_shared<std::vector<consumer<Value, no_error>>>();
+	}
+	return _consumers;
+}
+
+
 template <typename Value>
 event_stream<Value>::~event_stream() {
 	if (_consumers) {
@@ -123,4 +150,11 @@ event_stream<Value>::~event_stream() {
 	}
 }
 
+template <typename Value>
+inline auto to_stream(event_stream<Value> &stream) {
+	return on_next([&stream](Value &&value) {
+		stream.fire(std::move(value));
+	});
+}
+
 } // namespace rpl
diff --git a/Telegram/SourceFiles/rpl/fail.h b/Telegram/SourceFiles/rpl/fail.h
new file mode 100644
index 000000000..06c851f07
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/fail.h
@@ -0,0 +1,37 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/producer.h>
+
+namespace rpl {
+
+template <typename Value, typename Error>
+producer<Value, std::decay_t<Error>> fail(Error &&error) {
+	using consumer_t = consumer<Value, std::decay_t<Error>>;
+	return [error = std::forward<Error>(error)](
+			const consumer_t &consumer) mutable {
+		consumer.put_error(std::move(error));
+		return lifetime();
+	};
+}
+
+} // namespace rpl
diff --git a/Telegram/SourceFiles/rpl/filter.h b/Telegram/SourceFiles/rpl/filter.h
new file mode 100644
index 000000000..ff6cc49b4
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/filter.h
@@ -0,0 +1,75 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/producer.h>
+
+namespace rpl {
+namespace details {
+
+template <typename Predicate>
+class filter_helper {
+public:
+	template <typename OtherPredicate>
+	filter_helper(OtherPredicate &&predicate)
+		: _predicate(std::forward<OtherPredicate>(predicate)) {
+	}
+
+	template <typename Value, typename Error>
+	rpl::producer<Value, Error> operator()(
+			rpl::producer<Value, Error> &&initial) {
+		return [
+				initial = std::move(initial),
+				predicate = std::move(_predicate)
+			](
+				const consumer<Value, Error> &consumer) mutable {
+			return std::move(initial).start(
+				[
+					consumer,
+					predicate = std::move(predicate)
+				](Value &&value) {
+					const auto &immutable = value;
+					if (predicate(immutable)) {
+						consumer.put_next(std::move(value));
+					}
+				}, [consumer](Error &&error) {
+					consumer.put_error(std::move(error));
+				}, [consumer] {
+					consumer.put_done();
+				});
+		};
+	}
+
+private:
+	Predicate _predicate;
+
+};
+
+} // namespace details
+
+template <typename Predicate>
+auto filter(Predicate &&predicate)
+-> details::filter_helper<std::decay_t<Predicate>> {
+	return details::filter_helper<std::decay_t<Predicate>>(
+		std::forward<Predicate>(predicate));
+}
+
+} // namespace rpl
diff --git a/Telegram/SourceFiles/rpl/flatten_latest.h b/Telegram/SourceFiles/rpl/flatten_latest.h
new file mode 100644
index 000000000..b237c8d7d
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/flatten_latest.h
@@ -0,0 +1,82 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/producer.h>
+
+namespace rpl {
+namespace details {
+
+class flatten_latest_helper {
+public:
+	template <typename Value, typename Error>
+	rpl::producer<Value, Error> operator()(
+			rpl::producer<
+				rpl::producer<Value, Error>,
+				Error
+			> &&initial) const {
+		return [initial = std::move(initial)](
+				const consumer<Value, Error> &consumer) mutable {
+			auto state = std::make_shared<State>();
+			return std::move(initial).start(
+			[consumer, state](rpl::producer<Value, Error> &&inner) {
+				state->finished = false;
+				state->alive = std::move(inner).start(
+				[consumer](Value &&value) {
+					consumer.put_next(std::move(value));
+				}, [consumer](Error &&error) {
+					consumer.put_error(std::move(error));
+				}, [consumer, state] {
+					if (state->finished) {
+						consumer.put_done();
+					} else {
+						state->finished = true;
+					}
+				});
+			}, [consumer](Error &&error) {
+				consumer.put_error(std::move(error));
+			}, [consumer, state] {
+				if (state->finished) {
+					consumer.put_done();
+				} else {
+					state->finished = true;
+				}
+			});
+		};
+	}
+
+private:
+	struct State {
+		lifetime alive;
+		bool finished = false;
+	};
+
+};
+
+} // namespace details
+
+inline auto flatten_latest()
+-> details::flatten_latest_helper {
+	return details::flatten_latest_helper();
+}
+
+} // namespace rpl
+
diff --git a/Telegram/SourceFiles/rpl/map.h b/Telegram/SourceFiles/rpl/map.h
new file mode 100644
index 000000000..1e7f28dd9
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/map.h
@@ -0,0 +1,76 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/producer.h>
+
+namespace rpl {
+namespace details {
+
+template <typename Transform>
+class map_helper {
+public:
+	template <typename OtherTransform>
+	map_helper(OtherTransform &&transform)
+		: _transform(std::forward<OtherTransform>(transform)) {
+	}
+
+	template <
+		typename Value,
+		typename Error,
+		typename NewValue = decltype(
+			std::declval<Transform>()(std::declval<Value>())
+		)>
+	rpl::producer<NewValue, Error> operator()(
+			rpl::producer<Value, Error> &&initial) {
+		return [
+			initial = std::move(initial),
+			transform = std::move(_transform)
+		](const consumer<NewValue, Error> &consumer) mutable {
+			return std::move(initial).start(
+				[
+					consumer,
+					transform = std::move(transform)
+				](Value &&value) {
+				consumer.put_next(transform(std::move(value)));
+			}, [consumer](Error &&error) {
+				consumer.put_error(std::move(error));
+			}, [consumer] {
+				consumer.put_done();
+			});
+		};
+	}
+
+private:
+	Transform _transform;
+
+};
+
+} // namespace details
+
+template <typename Transform>
+auto map(Transform &&transform)
+-> details::map_helper<std::decay_t<Transform>> {
+	return details::map_helper<std::decay_t<Transform>>(
+		std::forward<Transform>(transform));
+}
+
+} // namespace rpl
diff --git a/Telegram/SourceFiles/rpl/never.h b/Telegram/SourceFiles/rpl/never.h
new file mode 100644
index 000000000..c3d32c1dd
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/never.h
@@ -0,0 +1,34 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/producer.h>
+
+namespace rpl {
+
+template <typename Value = empty_value, typename Error = no_error>
+producer<Value, Error> never() {
+	return [](const consumer<Value, Error> &consumer) mutable {
+		return lifetime();
+	};
+}
+
+} // namespace rpl
diff --git a/Telegram/SourceFiles/rpl/operators_tests.cpp b/Telegram/SourceFiles/rpl/operators_tests.cpp
new file mode 100644
index 000000000..0f1c43ce5
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/operators_tests.cpp
@@ -0,0 +1,248 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#include "catch.hpp"
+
+#include <rpl/rpl.h>
+#include <string>
+
+using namespace rpl;
+
+class OnDestructor {
+public:
+	OnDestructor(base::lambda_once<void()> callback)
+		: _callback(std::move(callback)) {
+	}
+	~OnDestructor() {
+		if (_callback) {
+			_callback();
+		}
+	}
+
+private:
+	base::lambda_once<void()> _callback;
+
+};
+
+class InvokeCounter {
+public:
+	InvokeCounter(
+		const std::shared_ptr<int> &copyCounter,
+		const std::shared_ptr<int> &moveCounter)
+		: _copyCounter(copyCounter)
+		, _moveCounter(moveCounter) {
+	}
+	InvokeCounter(const InvokeCounter &other)
+		: _copyCounter(other._copyCounter)
+		, _moveCounter(other._moveCounter) {
+		if (_copyCounter) {
+			++*_copyCounter;
+		}
+	}
+	InvokeCounter(InvokeCounter &&other)
+		: _copyCounter(base::take(other._copyCounter))
+		, _moveCounter(base::take(other._moveCounter)) {
+		if (_moveCounter) {
+			++*_moveCounter;
+		}
+	}
+	InvokeCounter &operator=(const InvokeCounter &other) {
+		_copyCounter = other._copyCounter;
+		_moveCounter = other._moveCounter;
+		if (_copyCounter) {
+			++*_copyCounter;
+		}
+	}
+	InvokeCounter &operator=(InvokeCounter &&other) {
+		_copyCounter = base::take(other._copyCounter);
+		_moveCounter = base::take(other._moveCounter);
+		if (_moveCounter) {
+			++*_moveCounter;
+		}
+	}
+
+private:
+	std::shared_ptr<int> _copyCounter;
+	std::shared_ptr<int> _moveCounter;
+
+};
+
+TEST_CASE("basic operators tests", "[rpl::operators]") {
+	SECTION("single test") {
+		auto sum = std::make_shared<int>(0);
+		auto doneGenerated = std::make_shared<bool>(false);
+		auto destroyed = std::make_shared<bool>(false);
+		auto copyCount = std::make_shared<int>(0);
+		auto moveCount = std::make_shared<int>(0);
+		{
+			InvokeCounter counter(copyCount, moveCount);
+			auto destroyCalled = std::make_shared<OnDestructor>([=] {
+				*destroyed = true;
+			});
+			rpl::lifetime lifetime;
+			single(std::move(counter))
+				| on_next([=](InvokeCounter&&) {
+					(void)destroyCalled;
+					++*sum;
+				}) | on_error([=](no_error) {
+					(void)destroyCalled;
+				}) | on_done([=] {
+					(void)destroyCalled;
+					*doneGenerated = true;
+				}) | start(lifetime);
+		}
+		REQUIRE(*sum == 1);
+		REQUIRE(*doneGenerated);
+		REQUIRE(*destroyed);
+		REQUIRE(*copyCount == 0);
+	}
+
+	SECTION("then test") {
+		auto sum = std::make_shared<int>(0);
+		auto doneGenerated = std::make_shared<bool>(false);
+		auto destroyed = std::make_shared<bool>(false);
+		auto copyCount = std::make_shared<int>(0);
+		auto moveCount = std::make_shared<int>(0);
+		{
+			auto testing = complete<InvokeCounter>();
+			for (auto i = 0; i != 5; ++i) {
+				InvokeCounter counter(copyCount, moveCount);
+				testing = std::move(testing)
+					| then(single(std::move(counter)));
+			}
+			auto destroyCalled = std::make_shared<OnDestructor>([=] {
+				*destroyed = true;
+			});
+
+			rpl::lifetime lifetime;
+			std::move(testing)
+			| then(complete<InvokeCounter>())
+			| on_next([=](InvokeCounter&&) {
+				(void)destroyCalled;
+				++*sum;
+			}) | on_error([=](no_error) {
+				(void)destroyCalled;
+			}) | on_done([=] {
+				(void)destroyCalled;
+				*doneGenerated = true;
+			}) | start(lifetime);
+		}
+		REQUIRE(*sum == 5);
+		REQUIRE(*doneGenerated);
+		REQUIRE(*destroyed);
+		REQUIRE(*copyCount == 0);
+	}
+
+	SECTION("map test") {
+		auto sum = std::make_shared<std::string>("");
+		{
+			rpl::lifetime lifetime;
+			single(1)
+				| then(single(2))
+				| then(single(3))
+				| then(single(4))
+				| then(single(5))
+				| map([](int value) {
+				return std::to_string(value);
+			}) | on_next([=](std::string &&value) {
+				*sum += std::move(value) + ' ';
+			}) | start(lifetime);
+		}
+		REQUIRE(*sum == "1 2 3 4 5 ");
+	}
+
+	SECTION("deferred test") {
+		auto launched = std::make_shared<int>(0);
+		auto checked = std::make_shared<int>(0);
+		{
+			rpl::lifetime lifetime;
+			auto make_next = [=] {
+				return deferred([=] {
+					return single(++*launched);
+				});
+			};
+			make_next()
+				| then(make_next())
+				| then(make_next())
+				| then(make_next())
+				| then(make_next())
+				| on_next([=](int value) {
+					REQUIRE(++*checked == *launched);
+					REQUIRE(*checked == value);
+				}) | start(lifetime);
+			REQUIRE(*launched == 5);
+		}
+	}
+
+	SECTION("filter test") {
+		auto sum = std::make_shared<std::string>("");
+		{
+			rpl::lifetime lifetime;
+			single(1)
+				| then(single(1))
+				| then(single(2))
+				| then(single(2))
+				| then(single(3))
+				| filter([](int value) { return value != 2; })
+				| map([](int value) {
+					return std::to_string(value);
+				}) | on_next([=](std::string &&value) {
+					*sum += std::move(value) + ' ';
+				}) | start(lifetime);
+		}
+		REQUIRE(*sum == "1 1 3 ");
+	}
+
+	SECTION("distinct_until_changed test") {
+		auto sum = std::make_shared<std::string>("");
+		{
+			rpl::lifetime lifetime;
+			single(1)
+				| then(single(1))
+				| then(single(2))
+				| then(single(2))
+				| then(single(3))
+				| distinct_until_changed()
+				| map([](int value) {
+				return std::to_string(value);
+			}) | on_next([=](std::string &&value) {
+				*sum += std::move(value) + ' ';
+			}) | start(lifetime);
+		}
+		REQUIRE(*sum == "1 2 3 ");
+	}
+
+	SECTION("flatten_latest test") {
+		auto sum = std::make_shared<std::string>("");
+		{
+			rpl::lifetime lifetime;
+			single(single(1) | then(single(2)))
+				| then(single(single(3) | then(single(4))))
+				| then(single(single(5) | then(single(6))))
+				| flatten_latest()
+				| map([](int value) {
+				return std::to_string(value);
+			}) | on_next([=](std::string &&value) {
+				*sum += std::move(value) + ' ';
+			}) | start(lifetime);
+		}
+		REQUIRE(*sum == "1 2 3 4 5 6 ");
+	}
+}
diff --git a/Telegram/SourceFiles/rpl/producer.h b/Telegram/SourceFiles/rpl/producer.h
index b12bad7d1..80dc063e4 100644
--- a/Telegram/SourceFiles/rpl/producer.h
+++ b/Telegram/SourceFiles/rpl/producer.h
@@ -21,19 +21,70 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
 #pragma once
 
 #include "base/lambda.h"
-#include "rpl/consumer.h"
-#include "rpl/lifetime.h"
+#include <rpl/consumer.h>
+#include <rpl/lifetime.h>
 
 namespace rpl {
+namespace details {
 
-template <typename Value, typename Error = no_error>
+template <typename Lambda>
+class mutable_lambda_wrap {
+public:
+	mutable_lambda_wrap(Lambda &&lambda)
+		: _lambda(std::move(lambda)) {
+	}
+
+	template <typename... Args>
+	auto operator()(Args&&... args) const {
+		return (const_cast<mutable_lambda_wrap*>(this)->_lambda)(
+			std::forward<Args>(args)...);
+	}
+
+private:
+	Lambda _lambda;
+
+};
+
+// Type-erased copyable mutable lambda using base::lambda.
+template <typename Function> class mutable_lambda;
+
+template <typename Return, typename ...Args>
+class mutable_lambda<Return(Args...)> {
+public:
+
+	// Copy / move construct / assign from an arbitrary type.
+	template <
+		typename Lambda,
+		typename = std::enable_if_t<std::is_convertible<
+			decltype(std::declval<Lambda>()(std::declval<Args>()...)),
+			Return
+		>::value>>
+	mutable_lambda(Lambda other) : _implementation(mutable_lambda_wrap<Lambda>(std::move(other))) {
+	}
+
+	template <
+		typename ...OtherArgs,
+		typename = std::enable_if_t<(sizeof...(Args) == sizeof...(OtherArgs))>>
+	Return operator()(OtherArgs&&... args) {
+		return _implementation(std::forward<OtherArgs>(args)...);
+	}
+
+private:
+	base::lambda<Return(Args...)> _implementation;
+
+};
+
+} // namespace details
+
+template <typename Value = empty_value, typename Error = no_error>
 class producer {
 public:
 	using value_type = Value;
 	using error_type = Error;
+	using consumer_type = consumer<Value, Error>;
 
 	template <typename Generator, typename = std::enable_if<std::is_convertible<
-		decltype(std::declval<Generator>()(std::declval<consumer<Value, Error>>())),
+		decltype(std::declval<Generator>()(std::declval<consumer_type>())),
 		lifetime
 	>::value>>
 	producer(Generator &&generator);
@@ -48,10 +99,25 @@ public:
 	lifetime start(
 		OnNext &&next,
 		OnError &&error,
-		OnDone &&done) const;
+		OnDone &&done) &&;
+
+	template <
+		typename OnNext,
+		typename OnError,
+		typename OnDone,
+		typename = decltype(std::declval<OnNext>()(std::declval<Value>())),
+		typename = decltype(std::declval<OnError>()(std::declval<Error>())),
+		typename = decltype(std::declval<OnDone>()())>
+	lifetime start_copy(
+		OnNext &&next,
+		OnError &&error,
+		OnDone &&done) const &;
+
+	lifetime start_existing(const consumer_type &consumer) &&;
 
 private:
-	base::lambda<lifetime(consumer<Value, Error>)> _generator;
+	details::mutable_lambda<
+		lifetime(const consumer_type &)> _generator;
 
 };
 
@@ -72,13 +138,37 @@ template <
 lifetime producer<Value, Error>::start(
 		OnNext &&next,
 		OnError &&error,
-		OnDone &&done) const {
-	auto result = consumer<Value, Error>(
+		OnDone &&done) && {
+	return std::move(*this).start_existing(consumer<Value, Error>(
+		std::forward<OnNext>(next),
+		std::forward<OnError>(error),
+		std::forward<OnDone>(done)));
+}
+
+template <typename Value, typename Error>
+template <
+	typename OnNext,
+	typename OnError,
+	typename OnDone,
+	typename,
+	typename,
+	typename>
+lifetime producer<Value, Error>::start_copy(
+		OnNext &&next,
+		OnError &&error,
+		OnDone &&done) const & {
+	auto copy = *this;
+	return std::move(copy).start(
 		std::forward<OnNext>(next),
 		std::forward<OnError>(error),
 		std::forward<OnDone>(done));
-	result.set_lifetime(_generator(result));
-	return [result] { result.terminate(); };
+}
+
+template <typename Value, typename Error>
+lifetime producer<Value, Error>::start_existing(
+		const consumer_type &consumer) && {
+	consumer.add_lifetime(std::move(_generator)(consumer));
+	return [consumer] { consumer.terminate(); };
 }
 
 template <typename Value, typename Error>
@@ -91,21 +181,21 @@ template <
 	typename Error,
 	typename Method,
 	typename = decltype(std::declval<Method>()(std::declval<producer<Value, Error>>()))>
-inline decltype(auto) operator|(producer<Value, Error> &&producer, Method &&method) {
+inline auto operator|(producer<Value, Error> &&producer, Method &&method) {
 	return std::forward<Method>(method)(std::move(producer));
 }
 
 template <typename OnNext>
-inline decltype(auto) bind_on_next(OnNext &&handler) {
+inline auto bind_on_next(OnNext &&handler) {
 	return [handler = std::forward<OnNext>(handler)](auto &&existing) mutable {
 		using value_type = typename std::decay_t<decltype(existing)>::value_type;
 		using error_type = typename std::decay_t<decltype(existing)>::error_type;
 		return producer<no_value, error_type>([
 			existing = std::move(existing),
-			handler = std::forward<OnNext>(handler)
-		](consumer<no_value, error_type> consumer) {
-			return existing.start([handler = std::decay_t<OnNext>(handler)](
-				value_type &&value) {
+			handler = std::move(handler)
+		](const consumer<no_value, error_type> &consumer) mutable {
+			return std::move(existing).start(
+			[handler = std::move(handler)](value_type &&value) {
 				handler(std::move(value));
 			}, [consumer](error_type &&error) {
 				consumer.put_error(std::move(error));
@@ -117,17 +207,18 @@ inline decltype(auto) bind_on_next(OnNext &&handler) {
 }
 
 template <typename OnError>
-inline decltype(auto) bind_on_error(OnError &&handler) {
+inline auto bind_on_error(OnError &&handler) {
 	return [handler = std::forward<OnError>(handler)](auto &&existing) mutable {
 		using value_type = typename std::decay_t<decltype(existing)>::value_type;
 		using error_type = typename std::decay_t<decltype(existing)>::error_type;
 		return producer<value_type, no_error>([
 			existing = std::move(existing),
-			handler = std::forward<OnError>(handler)
-		](consumer<value_type, no_error> consumer) {
-			return existing.start([consumer](value_type &&value) {
+			handler = std::move(handler)
+		](const consumer<value_type, no_error> &consumer) mutable {
+			return std::move(existing).start(
+			[consumer](value_type &&value) {
 				consumer.put_next(std::move(value));
-			}, [handler = std::decay_t<OnError>(handler)](error_type &&error) {
+			}, [handler = std::move(handler)](error_type &&error) {
 				handler(std::move(error));
 			}, [consumer] {
 				consumer.put_done();
@@ -137,19 +228,20 @@ inline decltype(auto) bind_on_error(OnError &&handler) {
 }
 
 template <typename OnDone>
-inline decltype(auto) bind_on_done(OnDone &&handler) {
+inline auto bind_on_done(OnDone &&handler) {
 	return [handler = std::forward<OnDone>(handler)](auto &&existing) mutable {
 		using value_type = typename std::decay_t<decltype(existing)>::value_type;
 		using error_type = typename std::decay_t<decltype(existing)>::error_type;
 		return producer<value_type, error_type>([
 			existing = std::move(existing),
-			handler = std::forward<OnDone>(handler)
-		](consumer<value_type, error_type> consumer) {
-			return existing.start([consumer](value_type &&value) {
+			handler = std::move(handler)
+		](const consumer<value_type, error_type> &consumer) mutable {
+			return std::move(existing).start(
+			[consumer](value_type &&value) {
 				consumer.put_next(std::move(value));
 			}, [consumer](error_type &&value) {
 				consumer.put_error(std::move(value));
-			}, [handler = std::decay_t<OnDone>(handler)] {
+			}, [handler = std::move(handler)] {
 				handler();
 			});
 		});
@@ -500,10 +592,11 @@ inline void operator|(
 			OnError,
 			OnDone> &&producer_with_next_error_done,
 		lifetime_holder &&lifetime) {
-	lifetime.alive_while.add(producer_with_next_error_done.producer.start(
-		std::move(producer_with_next_error_done.next),
-		std::move(producer_with_next_error_done.error),
-		std::move(producer_with_next_error_done.done)));
+	lifetime.alive_while.add(
+		std::move(producer_with_next_error_done.producer).start(
+			std::move(producer_with_next_error_done.next),
+			std::move(producer_with_next_error_done.error),
+			std::move(producer_with_next_error_done.done)));
 }
 
 template <typename Value, typename Error>
diff --git a/Telegram/SourceFiles/rpl/producer_tests.cpp b/Telegram/SourceFiles/rpl/producer_tests.cpp
index 5e8f52407..fcc9f6eae 100644
--- a/Telegram/SourceFiles/rpl/producer_tests.cpp
+++ b/Telegram/SourceFiles/rpl/producer_tests.cpp
@@ -20,8 +20,8 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
 */
 #include "catch.hpp"
 
-#include "rpl/producer.h"
-#include "rpl/event_stream.h"
+#include <rpl/producer.h>
+#include <rpl/event_stream.h>
 
 using namespace rpl;
 
@@ -52,7 +52,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
 				*destroyed = true;
 			});
 			{
-				producer<int, no_error>([=](auto consumer) {
+				producer<int, no_error>([=](auto &&consumer) {
 					(void)destroyCaller;
 					consumer.put_next(1);
 					consumer.put_next(2);
@@ -82,7 +82,7 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
 	SECTION("producer error test") {
 		auto errorGenerated = std::make_shared<bool>(false);
 		{
-			producer<no_value, bool>([=](auto consumer) {
+			producer<no_value, bool>([=](auto &&consumer) {
 				consumer.put_error(true);
 				return lifetime();
 			}).start([=](no_value) {
@@ -99,16 +99,16 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
 		{
 			auto lifetimes = lifetime();
 			{
-				auto testProducer = producer<no_value, no_error>([=](auto consumer) {
+				auto testProducer = producer<no_value, no_error>([=](auto &&consumer) {
 					return [=] {
 						++*lifetimeEndCount;
 					};
 				});
-				lifetimes.add(testProducer.start([=](no_value) {
+				lifetimes.add(testProducer.start_copy([=](no_value) {
 				}, [=](no_error) {
 				}, [=] {
 				}));
-				lifetimes.add(testProducer.start([=](no_value) {
+				lifetimes.add(std::move(testProducer).start([=](no_value) {
 				}, [=](no_error) {
 				}, [=] {
 				}));
@@ -123,8 +123,8 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
 		auto lifetimeEndCount = std::make_shared<int>(0);
 		auto saved = lifetime();
 		{
-			saved = producer<int, no_error>([=](auto consumer) {
-				auto inner = producer<int, no_error>([=](auto consumer) {
+			saved = producer<int, no_error>([=](auto &&consumer) {
+				auto inner = producer<int, no_error>([=](auto &&consumer) {
 					consumer.put_next(1);
 					consumer.put_next(2);
 					consumer.put_next(3);
@@ -135,12 +135,12 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
 				auto result = lifetime([=] {
 					++*lifetimeEndCount;
 				});
-				result.add(inner.start([=](int value) {
+				result.add(inner.start_copy([=](int value) {
 					consumer.put_next_copy(value);
 				}, [=](no_error) {
 				}, [=] {
 				}));
-				result.add(inner.start([=](int value) {
+				result.add(std::move(inner).start([=](int value) {
 					consumer.put_next_copy(value);
 				}, [=](no_error) {
 				}, [=] {
@@ -157,7 +157,9 @@ TEST_CASE("basic producer tests", "[rpl::producer]") {
 		saved.destroy();
 		REQUIRE(*lifetimeEndCount == 3);
 	}
+}
 
+TEST_CASE("basic event_streams tests", "[rpl::event_stream]") {
 	SECTION("event_stream basic test") {
 		auto sum = std::make_shared<int>(0);
 		event_stream<int> stream;
@@ -326,7 +328,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
 		auto doneGenerated = std::make_shared<bool>(false);
 		{
 			auto alive = lifetime();
-			producer<int, no_error>([=](auto consumer) {
+			producer<int, no_error>([=](auto &&consumer) {
 				consumer.put_next(1);
 				consumer.put_next(2);
 				consumer.put_next(3);
@@ -338,7 +340,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
 				*doneGenerated = true;
 			}) | start(alive);
 
-			producer<no_value, int>([=](auto consumer) {
+			producer<no_value, int>([=](auto &&consumer) {
 				consumer.put_error(4);
 				return lifetime();
 			}) | bind_on_error([=](int value) {
@@ -356,7 +358,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
 		auto dones = std::make_shared<int>(0);
 		{
 			auto alive = lifetime();
-			producer<int, int>([=](auto consumer) {
+			producer<int, int>([=](auto &&consumer) {
 				consumer.put_next(1);
 				consumer.put_done();
 				return lifetime();
@@ -364,7 +366,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
 				*sum += value;
 			}) | start(alive);
 
-			producer<int, int>([=](auto consumer) {
+			producer<int, int>([=](auto &&consumer) {
 				consumer.put_next(11);
 				consumer.put_error(111);
 				return lifetime();
@@ -372,7 +374,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
 				*sum += value;
 			}) | start(alive);
 
-			producer<int, int>([=](auto consumer) {
+			producer<int, int>([=](auto &&consumer) {
 				consumer.put_next(1111);
 				consumer.put_done();
 				return lifetime();
@@ -380,7 +382,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
 				*dones += 1;
 			}) | start(alive);
 
-			producer<int, int>([=](auto consumer) {
+			producer<int, int>([=](auto &&consumer) {
 				consumer.put_next(11111);
 				consumer.put_next(11112);
 				consumer.put_next(11113);
@@ -394,7 +396,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
 		}
 
 		auto alive = lifetime();
-		producer<int, int>([=](auto consumer) {
+		producer<int, int>([=](auto &&consumer) {
 			consumer.put_next(111111);
 			consumer.put_next(111112);
 			consumer.put_next(111113);
@@ -406,7 +408,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
 			*dones += 11;
 		}) | start(alive);
 
-		producer<int, int>([=](auto consumer) {
+		producer<int, int>([=](auto &&consumer) {
 			consumer.put_error(1111111);
 			return lifetime();
 		}) | on_error([=](int value) {
@@ -434,7 +436,7 @@ TEST_CASE("basic piping tests", "[rpl::producer]") {
 
 			for (int i = 0; i != 3; ++i) {
 				auto alive = lifetime();
-				producer<int, int>([=](auto consumer) {
+				producer<int, int>([=](auto &&consumer) {
 					consumer.put_next(1);
 					consumer.put_done();
 					return lifetime();
diff --git a/Telegram/SourceFiles/rpl/rpl.h b/Telegram/SourceFiles/rpl/rpl.h
new file mode 100644
index 000000000..7db93de01
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/rpl.h
@@ -0,0 +1,40 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/lifetime.h>
+#include <rpl/consumer.h>
+#include <rpl/producer.h>
+#include <rpl/event_stream.h>
+
+#include <rpl/single.h>
+#include <rpl/complete.h>
+#include <rpl/fail.h>
+#include <rpl/never.h>
+
+#include <rpl/then.h>
+#include <rpl/deferred.h>
+#include <rpl/map.h>
+#include <rpl/filter.h>
+#include <rpl/distinct_until_changed.h>
+#include <rpl/flatten_latest.h>
+
+#include <rpl/before_next.h>
diff --git a/Telegram/SourceFiles/rpl/single.h b/Telegram/SourceFiles/rpl/single.h
new file mode 100644
index 000000000..873131561
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/single.h
@@ -0,0 +1,46 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/producer.h>
+
+namespace rpl {
+
+template <typename Value, typename Error = no_error>
+producer<std::decay_t<Value>, Error> single(Value &&value) {
+	using consumer_t = consumer<std::decay_t<Value>, Error>;
+	return [value = std::forward<Value>(value)](
+			const consumer_t &consumer) mutable {
+		consumer.put_next(std::move(value));
+		consumer.put_done();
+		return lifetime();
+	};
+}
+
+template <typename Error = no_error>
+producer<empty_value, Error> single() {
+	return [](const consumer<empty_value, Error> &consumer) {
+		consumer.put_next({});
+		consumer.put_done();
+	};
+}
+
+} // namespace rpl
diff --git a/Telegram/SourceFiles/rpl/then.h b/Telegram/SourceFiles/rpl/then.h
new file mode 100644
index 000000000..a2be3627c
--- /dev/null
+++ b/Telegram/SourceFiles/rpl/then.h
@@ -0,0 +1,58 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop version of Telegram messaging app, see https://telegram.org
+
+Telegram Desktop is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+It is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+In addition, as a special exception, the copyright holders give permission
+to link the code of portions of this program with the OpenSSL library.
+
+Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
+Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
+*/
+#pragma once
+
+#include <rpl/producer.h>
+
+namespace rpl {
+
+template <typename Value, typename Error>
+auto then(producer<Value, Error> &&following) {
+	return [following = std::move(following)](
+			producer<Value, Error> &&initial) mutable
+			-> producer<Value, Error> {
+		return [
+			initial = std::move(initial),
+			following = std::move(following)
+		](const consumer<Value, Error> &consumer) mutable {
+			return std::move(initial).start(
+			[consumer](Value &&value) {
+				consumer.put_next(std::move(value));
+			}, [consumer](Error &&error) {
+				consumer.put_error(std::move(error));
+			}, [
+				consumer,
+				following = std::move(following)
+			]() mutable {
+				consumer.add_lifetime(std::move(following).start(
+				[consumer](Value &&value) {
+					consumer.put_next(std::move(value));
+				}, [consumer](Error &&error) {
+					consumer.put_error(std::move(error));
+				}, [consumer] {
+					consumer.put_done();
+				}));
+			});
+		};
+	};
+}
+
+} // namespace rpl
diff --git a/Telegram/gyp/telegram_sources.txt b/Telegram/gyp/telegram_sources.txt
index 73ba528a8..c6f1e7130 100644
--- a/Telegram/gyp/telegram_sources.txt
+++ b/Telegram/gyp/telegram_sources.txt
@@ -401,10 +401,6 @@
 <(src_loc)/profile/profile_userpic_button.h
 <(src_loc)/profile/profile_widget.cpp
 <(src_loc)/profile/profile_widget.h
-<(src_loc)/rpl/consumer.h
-<(src_loc)/rpl/event_stream.h
-<(src_loc)/rpl/lifetime.h
-<(src_loc)/rpl/producer.h
 <(src_loc)/settings/settings_advanced_widget.cpp
 <(src_loc)/settings/settings_advanced_widget.h
 <(src_loc)/settings/settings_background_widget.cpp
diff --git a/Telegram/gyp/tests/tests.gyp b/Telegram/gyp/tests/tests.gyp
index b428e29a2..c3ddd5d5c 100644
--- a/Telegram/gyp/tests/tests.gyp
+++ b/Telegram/gyp/tests/tests.gyp
@@ -92,16 +92,29 @@
       '<(src_loc)/base/flat_set_tests.cpp',
     ],
   }, {
-    'target_name': 'tests_producer',
+    'target_name': 'tests_rpl',
     'includes': [
       'common_test.gypi',
     ],
     'sources': [
+      '<(src_loc)/rpl/before_next.h',
+      '<(src_loc)/rpl/complete.h',
       '<(src_loc)/rpl/consumer.h',
+      '<(src_loc)/rpl/deferred.h',
+      '<(src_loc)/rpl/distinct_until_changed.h',
       '<(src_loc)/rpl/event_stream.h',
+      '<(src_loc)/rpl/fail.h',
+      '<(src_loc)/rpl/filter.h',
+      '<(src_loc)/rpl/flatten_latest.h',
       '<(src_loc)/rpl/lifetime.h',
+      '<(src_loc)/rpl/map.h',
+      '<(src_loc)/rpl/never.h',
+      '<(src_loc)/rpl/operators_tests.cpp',
       '<(src_loc)/rpl/producer.h',
       '<(src_loc)/rpl/producer_tests.cpp',
+      '<(src_loc)/rpl/rpl.h',
+      '<(src_loc)/rpl/single.h',
+      '<(src_loc)/rpl/then.h',
     ],
   }],
 }
diff --git a/Telegram/gyp/tests/tests_list.txt b/Telegram/gyp/tests/tests_list.txt
index d2220189f..c80f601f2 100644
--- a/Telegram/gyp/tests/tests_list.txt
+++ b/Telegram/gyp/tests/tests_list.txt
@@ -2,4 +2,4 @@ tests_algorithm
 tests_flags
 tests_flat_map
 tests_flat_set
-tests_producer
+tests_rpl
\ No newline at end of file