summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2016-03-09 19:10:03 -0800
committerKirk Shoop <kirk.shoop@microsoft.com>2016-03-10 17:24:21 -0800
commit33ccae2842a9558286f2d0a258806939ac748d75 (patch)
tree790dc15140fb3acd2ef8ee7dfe3a7c41a88161b6
parent18605f6fbca9bca6b3230029a928a0aa4fcd3de3 (diff)
downloadRxCpp-33ccae2842a9558286f2d0a258806939ac748d75.tar.gz
make operator| work better
converts linesfrombytes example to use operator|
-rw-r--r--README.md60
-rw-r--r--Rx/v2/examples/linesfrombytes/main.cpp78
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-concat.hpp21
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-concat_map.hpp15
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-flat_map.hpp18
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-group_by.hpp14
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-merge.hpp5
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-reduce.hpp67
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-repeat.hpp8
-rw-r--r--Rx/v2/src/rxcpp/rx-grouped_observable.hpp8
-rw-r--r--Rx/v2/src/rxcpp/rx-util.hpp25
11 files changed, 229 insertions, 90 deletions
diff --git a/README.md b/README.md
index ce0b5ab..ee47366 100644
--- a/README.md
+++ b/README.md
@@ -3,24 +3,23 @@ The Reactive Extensions for Native (__RxCpp__) is a library for composing asynch
Windows: [![Windows Status](http://img.shields.io/appveyor/ci/kirkshoop/RxCpp-446.svg?style=flat-square)](https://ci.appveyor.com/project/kirkshoop/rxcpp-446)
Linux & OSX: [![Linux & Osx Status](http://img.shields.io/travis/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://travis-ci.org/Reactive-Extensions/RxCpp)
+[![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)
+
+[![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
+[![doxygen documentation](https://img.shields.io/badge/documentation-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp)
+
Github: [![GitHub release](https://img.shields.io/github/release/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp/releases)
[![GitHub commits](https://img.shields.io/github/commits-since/Reactive-Extensions/RxCpp/v2.2.0.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)
NuGet: [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/)
[![NuGet downloads](http://img.shields.io/nuget/dt/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/)
-[![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp)
-
-[![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
-[![doxygen documentation](https://img.shields.io/badge/documentation-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp)
-
#Example
Add ```Rx/v2/src``` to the include paths
[![lines from bytes](https://img.shields.io/badge/blog%20post-lines%20from%20bytes-blue.svg?style=flat-square)](http://kirkshoop.github.io/async/rxcpp/c++/2015/07/07/rxcpp_-_parsing_bytes_to_lines_of_text.html)
```cpp
-
#include "rxcpp/rx.hpp"
using namespace rxcpp;
using namespace rxcpp::sources;
@@ -31,41 +30,42 @@ using namespace rxcpp::util;
#include <random>
using namespace std;
-//using rxcpp::operators::sum;
-
int main()
{
random_device rd; // non-deterministic generator
mt19937 gen(rd());
uniform_int_distribution<> dist(4, 18);
- // produce byte stream that contains lines of text
+ // for testing purposes, produce byte stream that from lines of text
auto bytes = range(1, 10) |
- flat_map([&](int i){
- return from(
- from((uint8_t)('A' + i)) |
- repeat(dist(gen)),
- from((uint8_t)'\r')) |
- concat();
+ flat_map([&](int i){
+ auto body = from((uint8_t)('A' + i)) |
+ repeat(dist(gen)) |
+ as_dynamic();
+ auto delim = from((uint8_t)'\r');
+ return from(body, delim) | concat();
}) |
window(17) |
- flat_map([](observable<uint8_t> w){
+ flat_map([](observable<uint8_t> w){
return w |
reduce(
- vector<uint8_t>(),
+ vector<uint8_t>(),
[](vector<uint8_t>& v, uint8_t b){
- v.push_back(b);
+ v.push_back(b);
return move(v);
- },
- [](vector<uint8_t>& v){return move(v);}) |
- as_dynamic();
+ }) |
+ as_dynamic();
}) |
- filter([](vector<uint8_t>& v){
+ tap([](vector<uint8_t>& v){
+ // print input packet of bytes
copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
- cout << endl;
- return true;
+ cout << endl;
});
+ //
+ // recover lines of text from byte stream
+ //
+
// create strings split on \r
auto strings = bytes |
concat_map([](vector<uint8_t> v){
@@ -75,6 +75,9 @@ int main()
sregex_token_iterator end;
vector<string> splits(cursor, end);
return iterate(move(splits));
+ }) |
+ filter([](string& s){
+ return !s.empty();
});
// group strings by line
@@ -83,18 +86,17 @@ int main()
group_by(
[=](string& s) mutable {
return s.back() == '\r' ? group++ : group;
- },
- [](string& s) { return move(s);});
+ });
// reduce the strings for a line into one string
auto lines = linewindows |
- flat_map([](grouped_observable<int, string> w){
- return w | sum();
+ flat_map([](grouped_observable<int, string> w){
+ return w | sum();
});
// print result
lines |
- subscribe(println(cout));
+ subscribe<string>(println(cout));
return 0;
}
diff --git a/Rx/v2/examples/linesfrombytes/main.cpp b/Rx/v2/examples/linesfrombytes/main.cpp
index 30639a3..b027d13 100644
--- a/Rx/v2/examples/linesfrombytes/main.cpp
+++ b/Rx/v2/examples/linesfrombytes/main.cpp
@@ -2,6 +2,7 @@
#include "rxcpp/rx.hpp"
using namespace rxcpp;
using namespace rxcpp::sources;
+using namespace rxcpp::operators;
using namespace rxcpp::util;
#include <regex>
@@ -14,64 +15,67 @@ int main()
mt19937 gen(rd());
uniform_int_distribution<> dist(4, 18);
- // produce byte stream that contains lines of text
- auto bytes = range(1, 10).
- map([&](int i){
- return from((uint8_t)('A' + i)).
- repeat(dist(gen)).
- concat(from((uint8_t)'\r'));
- }).
- merge().
- window(17).
- map([](observable<uint8_t> w){
- return w.
+ // for testing purposes, produce byte stream that from lines of text
+ auto bytes = range(1, 10) |
+ flat_map([&](int i){
+ auto body = from((uint8_t)('A' + i)) |
+ repeat(dist(gen)) |
+ as_dynamic();
+ auto delim = from((uint8_t)'\r');
+ return from(body, delim) | concat();
+ }) |
+ window(17) |
+ flat_map([](observable<uint8_t> w){
+ return w |
reduce(
- vector<uint8_t>(),
+ vector<uint8_t>(),
[](vector<uint8_t>& v, uint8_t b){
- v.push_back(b);
+ v.push_back(b);
return move(v);
- },
- [](vector<uint8_t>& v){return move(v);}).
- as_dynamic();
- }).
- merge().
- filter([](vector<uint8_t>& v){
+ }) |
+ as_dynamic();
+ }) |
+ tap([](vector<uint8_t>& v){
+ // print input packet of bytes
copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
- cout << endl;
- return true;
+ cout << endl;
});
+ //
+ // recover lines of text from byte stream
+ //
+
// create strings split on \r
- auto strings = bytes.
- map([](vector<uint8_t> v){
+ auto strings = bytes |
+ concat_map([](vector<uint8_t> v){
string s(v.begin(), v.end());
regex delim(R"/(\r)/");
- sregex_token_iterator cursor(s.begin(), s.end(), delim, {-1, 0});
- sregex_token_iterator end;
+ cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0});
+ cregex_token_iterator end;
vector<string> splits(cursor, end);
return iterate(move(splits));
- }).
- concat();
+ }) |
+ filter([](string& s){
+ return !s.empty();
+ });
// group strings by line
int group = 0;
- auto linewindows = strings.
+ auto linewindows = strings |
group_by(
[=](string& s) mutable {
return s.back() == '\r' ? group++ : group;
- },
- [](string& s) { return move(s);});
+ });
// reduce the strings for a line into one string
- auto lines = linewindows.
- map([](grouped_observable<int, string> w){
- return w.sum();
- }).
- merge();
+ auto lines = linewindows |
+ flat_map([](grouped_observable<int, string> w) {
+ return w | sum();
+ });
// print result
- lines.
- subscribe(println(cout));
+ lines |
+ subscribe<string>(println(cout));
return 0;
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat.hpp b/Rx/v2/src/rxcpp/operators/rx-concat.hpp
index a92b8b3..f6d637e 100644
--- a/Rx/v2/src/rxcpp/operators/rx-concat.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-concat.hpp
@@ -203,12 +203,31 @@ public:
}
-template<class Coordination>
+inline auto concat()
+ -> detail::concat_factory<identity_one_worker> {
+ return detail::concat_factory<identity_one_worker>(identity_current_thread());
+}
+
+template<class Coordination, class Check = typename std::enable_if<is_coordination<Coordination>::value>::type>
auto concat(Coordination&& sf)
-> detail::concat_factory<Coordination> {
return detail::concat_factory<Coordination>(std::forward<Coordination>(sf));
}
+template<class O0, class... ON, class Check = typename std::enable_if<is_observable<O0>::value>::type>
+auto concat(O0&& o0, ON&&... on)
+ -> detail::concat_factory<identity_one_worker> {
+ return detail::concat_factory<identity_one_worker>(identity_current_thread())(from(std::forward<O0>(o0), std::forward<ON>(on)...));
+}
+
+template<class Coordination, class O0, class... ON,
+ class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type,
+ class CheckO = typename std::enable_if<is_observable<O0>::value>::type>
+auto concat(Coordination&& sf, O0&& o0, ON&&... on)
+ -> detail::concat_factory<Coordination> {
+ return detail::concat_factory<Coordination>(std::forward<Coordination>(sf))(from(std::forward<O0>(o0), std::forward<ON>(on)...));
+}
+
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
index 49f50af..d7b32c8 100644
--- a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
@@ -45,7 +45,7 @@ struct concat_traits {
static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "concat_map ResultSelector must be a function with the signature concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)");
- typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type;
+ typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
};
template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
@@ -267,6 +267,19 @@ auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf)
return detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf));
}
+template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type>
+auto concat_map(CollectionSelector&& s, Coordination&& sf)
+ -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> {
+ return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf));
+}
+
+template<class CollectionSelector>
+auto concat_map(CollectionSelector&& s)
+ -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> {
+ return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread());
+}
+
+
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
index dcc3521..7f5aae1 100644
--- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
@@ -30,7 +30,7 @@ struct flat_map_traits {
static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)");
- typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type;
+ typedef rxu::decay_t<decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr)))> collection_type;
static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable");
@@ -43,7 +43,7 @@ struct flat_map_traits {
static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)");
- typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type;
+ typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
};
template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
@@ -234,8 +234,20 @@ auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf)
return detail::flat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf));
}
+template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type>
+auto flat_map(CollectionSelector&& s, Coordination&& sf)
+ -> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> {
+ return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf));
}
+template<class CollectionSelector>
+auto flat_map(CollectionSelector&& s)
+ -> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> {
+ return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread());
}
-#endif \ No newline at end of file
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
index 203f3c9..6da0af0 100644
--- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
@@ -190,7 +190,7 @@ public:
template<class Observable>
struct group_by_factory_traits
{
- typedef rxu::value_type_t<Observable> value_type;
+ typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type;
typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type;
};
@@ -209,6 +209,18 @@ inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p)
return detail::group_by_factory<KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p));
}
+template<class KeySelector, class MarbleSelector>
+inline auto group_by(KeySelector ks, MarbleSelector ms)
+ -> detail::group_by_factory<KeySelector, MarbleSelector, rxu::less> {
+ return detail::group_by_factory<KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less());
+}
+
+template<class KeySelector>
+inline auto group_by(KeySelector ks)
+ -> detail::group_by_factory<KeySelector, rxu::detail::take_at<0>, rxu::less> {
+ return detail::group_by_factory<KeySelector, rxu::detail::take_at<0>, rxu::less>(std::move(ks), rxu::take_at<0>(), rxu::less());
+}
+
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-merge.hpp b/Rx/v2/src/rxcpp/operators/rx-merge.hpp
index 17409d3..c46e752 100644
--- a/Rx/v2/src/rxcpp/operators/rx-merge.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-merge.hpp
@@ -182,6 +182,11 @@ public:
}
+inline auto merge()
+ -> detail::merge_factory<identity_one_worker> {
+ return detail::merge_factory<identity_one_worker>(identity_current_thread());
+}
+
template<class Coordination>
auto merge(Coordination&& sf)
-> detail::merge_factory<Coordination> {
diff --git a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
index 144c225..c3e234f 100644
--- a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
@@ -51,7 +51,7 @@ struct is_result_function_for {
template<class CS, class CRS>
static tag_not_valid check(...);
- typedef decltype(check<seed_type, result_selector_type>(0)) type;
+ typedef rxu::decay_t<decltype(check<seed_type, result_selector_type>(0))> type;
static const bool value = !std::is_same<type, tag_not_valid>::value;
};
@@ -69,7 +69,7 @@ struct reduce_traits
static_assert(is_result_function_for<seed_type, result_selector_type>::value, "reduce ResultSelector must be a function with the signature reduce::value_type(Seed)");
- typedef typename is_result_function_for<seed_type, result_selector_type>::type value_type;
+ typedef rxu::decay_t<typename is_result_function_for<seed_type, result_selector_type>::type> value_type;
};
template<class T, class SourceOperator, class Accumulator, class ResultSelector, class Seed>
@@ -189,6 +189,22 @@ public:
}
};
+template<template<class T> class Factory>
+class delay_reduce_factory
+{
+ template<class Observable> using accumulator_t = Factory<rxu::value_type_t<Observable>>;
+ template<class Observable> using result_selector_t = Factory<rxu::value_type_t<Observable>>;
+ template<class Observable> using seed_t = typename Factory<rxu::value_type_t<Observable>>::seed_type;
+ template<class Observable> using result_value_t = decltype(result_selector_t<Observable>()(*(seed_t<Observable>*)nullptr));
+public:
+ template<class Observable>
+ auto operator()(const Observable& source)
+ -> observable<result_value_t<Observable>, reduce<rxu::value_type_t<Observable>, typename Observable::source_operator_type, accumulator_t<Observable>, result_selector_t<Observable>, seed_t<Observable>>> {
+ return observable<result_value_t<Observable>, reduce<rxu::value_type_t<Observable>, typename Observable::source_operator_type, accumulator_t<Observable>, result_selector_t<Observable>, seed_t<Observable>>>(
+ reduce<rxu::value_type_t<Observable>, typename Observable::source_operator_type, accumulator_t<Observable>, result_selector_t<Observable>, seed_t<Observable>>(source.source_operator, accumulator_t<Observable>(), result_selector_t<Observable>(), accumulator_t<Observable>().seed()));
+ }
+};
+
template<class T>
struct initialize_seeder {
typedef T seed_type;
@@ -213,7 +229,7 @@ struct average {
seed_type seed() {
return seed_type{};
}
- seed_type operator()(seed_type a, T v) {
+ seed_type operator()(seed_type& a, T v) {
if (a.count != 0 &&
(a.count == std::numeric_limits<int>::max() ||
((v > 0) && (a.value > (std::numeric_limits<T>::max() - v))) ||
@@ -235,7 +251,7 @@ struct average {
}
return a;
}
- double operator()(seed_type a) {
+ double operator()(seed_type& a) {
if (a.count > 0) {
double avg = a.value / a.count;
if (!a.stage.empty()) {
@@ -253,14 +269,14 @@ struct sum {
seed_type seed() {
return seed_type();
}
- seed_type operator()(seed_type a, T v) {
+ seed_type operator()(seed_type& a, T v) {
if (a.empty())
a.reset(v);
else
*a = *a + v;
return a;
}
- T operator()(seed_type a) {
+ T operator()(seed_type& a) {
if (a.empty())
throw rxcpp::empty_error("sum() requires a stream with at least one value");
return *a;
@@ -273,14 +289,14 @@ struct max {
seed_type seed() {
return seed_type();
}
- seed_type operator()(seed_type a, T v) {
+ seed_type operator()(seed_type& a, T v) {
if (a.empty())
a.reset(v);
else
*a = (v < *a ? *a : v);
return a;
}
- T operator()(seed_type a) {
+ T operator()(seed_type& a) {
if (a.empty())
throw rxcpp::empty_error("max() requires a stream with at least one value");
return *a;
@@ -293,14 +309,14 @@ struct min {
seed_type seed() {
return seed_type();
}
- seed_type operator()(seed_type a, T v) {
+ seed_type operator()(seed_type& a, T v) {
if (a.empty())
a.reset(v);
else
*a = (*a < v ? *a : v);
return a;
}
- T operator()(seed_type a) {
+ T operator()(seed_type& a) {
if (a.empty())
throw rxcpp::empty_error("min() requires a stream with at least one value");
return *a;
@@ -315,6 +331,37 @@ auto reduce(Seed s, Accumulator a, ResultSelector rs)
return detail::reduce_factory<Accumulator, ResultSelector, Seed>(std::move(a), std::move(rs), std::move(s));
}
+template<class Seed, class Accumulator>
+auto reduce(Seed s, Accumulator a)
+ -> detail::reduce_factory<Accumulator, rxu::detail::take_at<0>, Seed> {
+ return detail::reduce_factory<Accumulator, rxu::detail::take_at<0>, Seed>(std::move(a), rxu::take_at<0>(), std::move(s));
+}
+
+inline auto count()
+ -> detail::reduce_factory<rxu::count, rxu::detail::take_at<0>, int> {
+ return detail::reduce_factory<rxu::count, rxu::detail::take_at<0>, int>(rxu::count(), rxu::take_at<0>(), 0);
+}
+
+inline auto average()
+ -> detail::delay_reduce_factory<detail::average> {
+ return detail::delay_reduce_factory<detail::average>();
+}
+
+inline auto sum()
+ -> detail::delay_reduce_factory<detail::sum> {
+ return detail::delay_reduce_factory<detail::sum>();
+}
+
+inline auto min()
+ -> detail::delay_reduce_factory<detail::min> {
+ return detail::delay_reduce_factory<detail::min>();
+}
+
+inline auto max()
+ -> detail::delay_reduce_factory<detail::max> {
+ return detail::delay_reduce_factory<detail::max>();
+}
+
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
index 438f461..90de9a0 100644
--- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
@@ -105,9 +105,9 @@ public:
template<class Observable>
auto operator()(Observable&& source)
- -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>> {
- return observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>>(
- repeat<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>(std::forward<Observable>(source), count));
+ -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, rxu::decay_t<Observable>, count_type>> {
+ return observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, rxu::decay_t<Observable>, count_type>>(
+ repeat<rxu::value_type_t<rxu::decay_t<Observable>>, rxu::decay_t<Observable>, count_type>(std::forward<Observable>(source), count));
}
};
@@ -115,7 +115,7 @@ public:
template<class T>
auto repeat(T&& t)
--> detail::repeat_factory<T> {
+ -> detail::repeat_factory<T> {
return detail::repeat_factory<T>(std::forward<T>(t));
}
diff --git a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp
index b532d6e..031bc53 100644
--- a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp
@@ -174,8 +174,8 @@ public:
// support range() >> filter() >> subscribe() syntax
// '>>' is spelled 'stream'
//
-template<class T, class SourceOperator, class OperatorFactory>
-auto operator >> (const rxcpp::grouped_observable<T, SourceOperator>& source, OperatorFactory&& of)
+template<class K, class T, class SourceOperator, class OperatorFactory>
+auto operator >> (const rxcpp::grouped_observable<K, T, SourceOperator>& source, OperatorFactory&& of)
-> decltype(source.op(std::forward<OperatorFactory>(of))) {
return source.op(std::forward<OperatorFactory>(of));
}
@@ -184,8 +184,8 @@ auto operator >> (const rxcpp::grouped_observable<T, SourceOperator>& source, Op
// support range() | filter() | subscribe() syntax
// '|' is spelled 'pipe'
//
-template<class T, class SourceOperator, class OperatorFactory>
-auto operator | (const rxcpp::grouped_observable<T, SourceOperator>& source, OperatorFactory&& of)
+template<class K, class T, class SourceOperator, class OperatorFactory>
+auto operator | (const rxcpp::grouped_observable<K, T, SourceOperator>& source, OperatorFactory&& of)
-> decltype(source.op(std::forward<OperatorFactory>(of))) {
return source.op(std::forward<OperatorFactory>(of));
}
diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp
index a4fb758..dcdc99d 100644
--- a/Rx/v2/src/rxcpp/rx-util.hpp
+++ b/Rx/v2/src/rxcpp/rx-util.hpp
@@ -214,6 +214,31 @@ inline auto pack()
return detail::pack();
}
+namespace detail {
+
+template<int Index>
+struct take_at
+{
+ template<class... ParamN>
+ auto operator()(ParamN... pn)
+ -> decay_t<decltype(std::get<Index>(std::make_tuple(std::move(pn)...)))> {
+ return std::get<Index>(std::make_tuple(std::move(pn)...));
+ }
+ template<class... ParamN>
+ auto operator()(ParamN... pn) const
+ -> decay_t<decltype(std::get<Index>(std::make_tuple(std::move(pn)...)))> {
+ return std::get<Index>(std::make_tuple(std::move(pn)...));
+ }
+};
+
+}
+
+template<int Index>
+inline auto take_at()
+ -> detail::take_at<Index> {
+ return detail::take_at<Index>();
+}
+
template <class D>
struct resolve_type;