summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2016-03-09 19:10:03 -0800
committerKirk Shoop <kirk.shoop@microsoft.com>2016-03-10 17:24:21 -0800
commit33ccae2842a9558286f2d0a258806939ac748d75 (patch)
tree790dc15140fb3acd2ef8ee7dfe3a7c41a88161b6 /Rx/v2/src/rxcpp/operators
parent18605f6fbca9bca6b3230029a928a0aa4fcd3de3 (diff)
downloadRxCpp-33ccae2842a9558286f2d0a258806939ac748d75.tar.gz
make operator| work better
converts linesfrombytes example to use operator|
Diffstat (limited to 'Rx/v2/src/rxcpp/operators')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-concat.hpp21
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-concat_map.hpp15
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-flat_map.hpp18
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-group_by.hpp14
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-merge.hpp5
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-reduce.hpp67
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-repeat.hpp8
7 files changed, 128 insertions, 20 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat.hpp b/Rx/v2/src/rxcpp/operators/rx-concat.hpp
index a92b8b3..f6d637e 100644
--- a/Rx/v2/src/rxcpp/operators/rx-concat.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-concat.hpp
@@ -203,12 +203,31 @@ public:
}
-template<class Coordination>
+inline auto concat()
+ -> detail::concat_factory<identity_one_worker> {
+ return detail::concat_factory<identity_one_worker>(identity_current_thread());
+}
+
+template<class Coordination, class Check = typename std::enable_if<is_coordination<Coordination>::value>::type>
auto concat(Coordination&& sf)
-> detail::concat_factory<Coordination> {
return detail::concat_factory<Coordination>(std::forward<Coordination>(sf));
}
+template<class O0, class... ON, class Check = typename std::enable_if<is_observable<O0>::value>::type>
+auto concat(O0&& o0, ON&&... on)
+ -> detail::concat_factory<identity_one_worker> {
+ return detail::concat_factory<identity_one_worker>(identity_current_thread())(from(std::forward<O0>(o0), std::forward<ON>(on)...));
+}
+
+template<class Coordination, class O0, class... ON,
+ class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type,
+ class CheckO = typename std::enable_if<is_observable<O0>::value>::type>
+auto concat(Coordination&& sf, O0&& o0, ON&&... on)
+ -> detail::concat_factory<Coordination> {
+ return detail::concat_factory<Coordination>(std::forward<Coordination>(sf))(from(std::forward<O0>(o0), std::forward<ON>(on)...));
+}
+
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
index 49f50af..d7b32c8 100644
--- a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
@@ -45,7 +45,7 @@ struct concat_traits {
static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "concat_map ResultSelector must be a function with the signature concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)");
- typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type;
+ typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
};
template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
@@ -267,6 +267,19 @@ auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf)
return detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf));
}
+template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type>
+auto concat_map(CollectionSelector&& s, Coordination&& sf)
+ -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> {
+ return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf));
+}
+
+template<class CollectionSelector>
+auto concat_map(CollectionSelector&& s)
+ -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> {
+ return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread());
+}
+
+
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
index dcc3521..7f5aae1 100644
--- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
@@ -30,7 +30,7 @@ struct flat_map_traits {
static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)");
- typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type;
+ typedef rxu::decay_t<decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr)))> collection_type;
static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable");
@@ -43,7 +43,7 @@ struct flat_map_traits {
static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)");
- typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type;
+ typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type;
};
template<class Observable, class CollectionSelector, class ResultSelector, class Coordination>
@@ -234,8 +234,20 @@ auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf)
return detail::flat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf));
}
+template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type>
+auto flat_map(CollectionSelector&& s, Coordination&& sf)
+ -> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> {
+ return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf));
}
+template<class CollectionSelector>
+auto flat_map(CollectionSelector&& s)
+ -> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> {
+ return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread());
}
-#endif \ No newline at end of file
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
index 203f3c9..6da0af0 100644
--- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
@@ -190,7 +190,7 @@ public:
template<class Observable>
struct group_by_factory_traits
{
- typedef rxu::value_type_t<Observable> value_type;
+ typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type;
typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type;
typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type;
};
@@ -209,6 +209,18 @@ inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p)
return detail::group_by_factory<KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p));
}
+template<class KeySelector, class MarbleSelector>
+inline auto group_by(KeySelector ks, MarbleSelector ms)
+ -> detail::group_by_factory<KeySelector, MarbleSelector, rxu::less> {
+ return detail::group_by_factory<KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less());
+}
+
+template<class KeySelector>
+inline auto group_by(KeySelector ks)
+ -> detail::group_by_factory<KeySelector, rxu::detail::take_at<0>, rxu::less> {
+ return detail::group_by_factory<KeySelector, rxu::detail::take_at<0>, rxu::less>(std::move(ks), rxu::take_at<0>(), rxu::less());
+}
+
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-merge.hpp b/Rx/v2/src/rxcpp/operators/rx-merge.hpp
index 17409d3..c46e752 100644
--- a/Rx/v2/src/rxcpp/operators/rx-merge.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-merge.hpp
@@ -182,6 +182,11 @@ public:
}
+inline auto merge()
+ -> detail::merge_factory<identity_one_worker> {
+ return detail::merge_factory<identity_one_worker>(identity_current_thread());
+}
+
template<class Coordination>
auto merge(Coordination&& sf)
-> detail::merge_factory<Coordination> {
diff --git a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
index 144c225..c3e234f 100644
--- a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
@@ -51,7 +51,7 @@ struct is_result_function_for {
template<class CS, class CRS>
static tag_not_valid check(...);
- typedef decltype(check<seed_type, result_selector_type>(0)) type;
+ typedef rxu::decay_t<decltype(check<seed_type, result_selector_type>(0))> type;
static const bool value = !std::is_same<type, tag_not_valid>::value;
};
@@ -69,7 +69,7 @@ struct reduce_traits
static_assert(is_result_function_for<seed_type, result_selector_type>::value, "reduce ResultSelector must be a function with the signature reduce::value_type(Seed)");
- typedef typename is_result_function_for<seed_type, result_selector_type>::type value_type;
+ typedef rxu::decay_t<typename is_result_function_for<seed_type, result_selector_type>::type> value_type;
};
template<class T, class SourceOperator, class Accumulator, class ResultSelector, class Seed>
@@ -189,6 +189,22 @@ public:
}
};
+template<template<class T> class Factory>
+class delay_reduce_factory
+{
+ template<class Observable> using accumulator_t = Factory<rxu::value_type_t<Observable>>;
+ template<class Observable> using result_selector_t = Factory<rxu::value_type_t<Observable>>;
+ template<class Observable> using seed_t = typename Factory<rxu::value_type_t<Observable>>::seed_type;
+ template<class Observable> using result_value_t = decltype(result_selector_t<Observable>()(*(seed_t<Observable>*)nullptr));
+public:
+ template<class Observable>
+ auto operator()(const Observable& source)
+ -> observable<result_value_t<Observable>, reduce<rxu::value_type_t<Observable>, typename Observable::source_operator_type, accumulator_t<Observable>, result_selector_t<Observable>, seed_t<Observable>>> {
+ return observable<result_value_t<Observable>, reduce<rxu::value_type_t<Observable>, typename Observable::source_operator_type, accumulator_t<Observable>, result_selector_t<Observable>, seed_t<Observable>>>(
+ reduce<rxu::value_type_t<Observable>, typename Observable::source_operator_type, accumulator_t<Observable>, result_selector_t<Observable>, seed_t<Observable>>(source.source_operator, accumulator_t<Observable>(), result_selector_t<Observable>(), accumulator_t<Observable>().seed()));
+ }
+};
+
template<class T>
struct initialize_seeder {
typedef T seed_type;
@@ -213,7 +229,7 @@ struct average {
seed_type seed() {
return seed_type{};
}
- seed_type operator()(seed_type a, T v) {
+ seed_type operator()(seed_type& a, T v) {
if (a.count != 0 &&
(a.count == std::numeric_limits<int>::max() ||
((v > 0) && (a.value > (std::numeric_limits<T>::max() - v))) ||
@@ -235,7 +251,7 @@ struct average {
}
return a;
}
- double operator()(seed_type a) {
+ double operator()(seed_type& a) {
if (a.count > 0) {
double avg = a.value / a.count;
if (!a.stage.empty()) {
@@ -253,14 +269,14 @@ struct sum {
seed_type seed() {
return seed_type();
}
- seed_type operator()(seed_type a, T v) {
+ seed_type operator()(seed_type& a, T v) {
if (a.empty())
a.reset(v);
else
*a = *a + v;
return a;
}
- T operator()(seed_type a) {
+ T operator()(seed_type& a) {
if (a.empty())
throw rxcpp::empty_error("sum() requires a stream with at least one value");
return *a;
@@ -273,14 +289,14 @@ struct max {
seed_type seed() {
return seed_type();
}
- seed_type operator()(seed_type a, T v) {
+ seed_type operator()(seed_type& a, T v) {
if (a.empty())
a.reset(v);
else
*a = (v < *a ? *a : v);
return a;
}
- T operator()(seed_type a) {
+ T operator()(seed_type& a) {
if (a.empty())
throw rxcpp::empty_error("max() requires a stream with at least one value");
return *a;
@@ -293,14 +309,14 @@ struct min {
seed_type seed() {
return seed_type();
}
- seed_type operator()(seed_type a, T v) {
+ seed_type operator()(seed_type& a, T v) {
if (a.empty())
a.reset(v);
else
*a = (*a < v ? *a : v);
return a;
}
- T operator()(seed_type a) {
+ T operator()(seed_type& a) {
if (a.empty())
throw rxcpp::empty_error("min() requires a stream with at least one value");
return *a;
@@ -315,6 +331,37 @@ auto reduce(Seed s, Accumulator a, ResultSelector rs)
return detail::reduce_factory<Accumulator, ResultSelector, Seed>(std::move(a), std::move(rs), std::move(s));
}
+template<class Seed, class Accumulator>
+auto reduce(Seed s, Accumulator a)
+ -> detail::reduce_factory<Accumulator, rxu::detail::take_at<0>, Seed> {
+ return detail::reduce_factory<Accumulator, rxu::detail::take_at<0>, Seed>(std::move(a), rxu::take_at<0>(), std::move(s));
+}
+
+inline auto count()
+ -> detail::reduce_factory<rxu::count, rxu::detail::take_at<0>, int> {
+ return detail::reduce_factory<rxu::count, rxu::detail::take_at<0>, int>(rxu::count(), rxu::take_at<0>(), 0);
+}
+
+inline auto average()
+ -> detail::delay_reduce_factory<detail::average> {
+ return detail::delay_reduce_factory<detail::average>();
+}
+
+inline auto sum()
+ -> detail::delay_reduce_factory<detail::sum> {
+ return detail::delay_reduce_factory<detail::sum>();
+}
+
+inline auto min()
+ -> detail::delay_reduce_factory<detail::min> {
+ return detail::delay_reduce_factory<detail::min>();
+}
+
+inline auto max()
+ -> detail::delay_reduce_factory<detail::max> {
+ return detail::delay_reduce_factory<detail::max>();
+}
+
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
index 438f461..90de9a0 100644
--- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
@@ -105,9 +105,9 @@ public:
template<class Observable>
auto operator()(Observable&& source)
- -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>> {
- return observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>>(
- repeat<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>(std::forward<Observable>(source), count));
+ -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, rxu::decay_t<Observable>, count_type>> {
+ return observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, rxu::decay_t<Observable>, count_type>>(
+ repeat<rxu::value_type_t<rxu::decay_t<Observable>>, rxu::decay_t<Observable>, count_type>(std::forward<Observable>(source), count));
}
};
@@ -115,7 +115,7 @@ public:
template<class T>
auto repeat(T&& t)
--> detail::repeat_factory<T> {
+ -> detail::repeat_factory<T> {
return detail::repeat_factory<T>(std::forward<T>(t));
}