diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-03-09 19:10:03 -0800 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-03-10 17:24:21 -0800 |
commit | 33ccae2842a9558286f2d0a258806939ac748d75 (patch) | |
tree | 790dc15140fb3acd2ef8ee7dfe3a7c41a88161b6 | |
parent | 18605f6fbca9bca6b3230029a928a0aa4fcd3de3 (diff) | |
download | RxCpp-33ccae2842a9558286f2d0a258806939ac748d75.tar.gz |
make operator| work better
converts linesfrombytes example to use operator|
-rw-r--r-- | README.md | 60 | ||||
-rw-r--r-- | Rx/v2/examples/linesfrombytes/main.cpp | 78 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-concat.hpp | 21 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-concat_map.hpp | 15 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-flat_map.hpp | 18 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-group_by.hpp | 14 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-merge.hpp | 5 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-reduce.hpp | 67 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-repeat.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-grouped_observable.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-util.hpp | 25 |
11 files changed, 229 insertions, 90 deletions
@@ -3,24 +3,23 @@ The Reactive Extensions for Native (__RxCpp__) is a library for composing asynch Windows: [![Windows Status](http://img.shields.io/appveyor/ci/kirkshoop/RxCpp-446.svg?style=flat-square)](https://ci.appveyor.com/project/kirkshoop/rxcpp-446) Linux & OSX: [![Linux & Osx Status](http://img.shields.io/travis/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://travis-ci.org/Reactive-Extensions/RxCpp) +[![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp) + +[![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +[![doxygen documentation](https://img.shields.io/badge/documentation-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp) + Github: [![GitHub release](https://img.shields.io/github/release/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp/releases) [![GitHub commits](https://img.shields.io/github/commits-since/Reactive-Extensions/RxCpp/v2.2.0.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp) NuGet: [![NuGet version](http://img.shields.io/nuget/v/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/) [![NuGet downloads](http://img.shields.io/nuget/dt/RxCpp.svg?style=flat-square)](http://www.nuget.org/packages/RxCpp/) -[![GitHub license](https://img.shields.io/github/license/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://github.com/Reactive-Extensions/RxCpp) - -[![Join in on gitter.im](https://img.shields.io/gitter/room/Reactive-Extensions/RxCpp.svg?style=flat-square)](https://gitter.im/Reactive-Extensions/RxCpp?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) -[![doxygen documentation](https://img.shields.io/badge/documentation-latest-brightgreen.svg?style=flat-square)](http://reactive-extensions.github.io/RxCpp) - #Example Add ```Rx/v2/src``` to the include paths [![lines from bytes](https://img.shields.io/badge/blog%20post-lines%20from%20bytes-blue.svg?style=flat-square)](http://kirkshoop.github.io/async/rxcpp/c++/2015/07/07/rxcpp_-_parsing_bytes_to_lines_of_text.html) ```cpp - #include "rxcpp/rx.hpp" using namespace rxcpp; using namespace rxcpp::sources; @@ -31,41 +30,42 @@ using namespace rxcpp::util; #include <random> using namespace std; -//using rxcpp::operators::sum; - int main() { random_device rd; // non-deterministic generator mt19937 gen(rd()); uniform_int_distribution<> dist(4, 18); - // produce byte stream that contains lines of text + // for testing purposes, produce byte stream that from lines of text auto bytes = range(1, 10) | - flat_map([&](int i){ - return from( - from((uint8_t)('A' + i)) | - repeat(dist(gen)), - from((uint8_t)'\r')) | - concat(); + flat_map([&](int i){ + auto body = from((uint8_t)('A' + i)) | + repeat(dist(gen)) | + as_dynamic(); + auto delim = from((uint8_t)'\r'); + return from(body, delim) | concat(); }) | window(17) | - flat_map([](observable<uint8_t> w){ + flat_map([](observable<uint8_t> w){ return w | reduce( - vector<uint8_t>(), + vector<uint8_t>(), [](vector<uint8_t>& v, uint8_t b){ - v.push_back(b); + v.push_back(b); return move(v); - }, - [](vector<uint8_t>& v){return move(v);}) | - as_dynamic(); + }) | + as_dynamic(); }) | - filter([](vector<uint8_t>& v){ + tap([](vector<uint8_t>& v){ + // print input packet of bytes copy(v.begin(), v.end(), ostream_iterator<long>(cout, " ")); - cout << endl; - return true; + cout << endl; }); + // + // recover lines of text from byte stream + // + // create strings split on \r auto strings = bytes | concat_map([](vector<uint8_t> v){ @@ -75,6 +75,9 @@ int main() sregex_token_iterator end; vector<string> splits(cursor, end); return iterate(move(splits)); + }) | + filter([](string& s){ + return !s.empty(); }); // group strings by line @@ -83,18 +86,17 @@ int main() group_by( [=](string& s) mutable { return s.back() == '\r' ? group++ : group; - }, - [](string& s) { return move(s);}); + }); // reduce the strings for a line into one string auto lines = linewindows | - flat_map([](grouped_observable<int, string> w){ - return w | sum(); + flat_map([](grouped_observable<int, string> w){ + return w | sum(); }); // print result lines | - subscribe(println(cout)); + subscribe<string>(println(cout)); return 0; } diff --git a/Rx/v2/examples/linesfrombytes/main.cpp b/Rx/v2/examples/linesfrombytes/main.cpp index 30639a3..b027d13 100644 --- a/Rx/v2/examples/linesfrombytes/main.cpp +++ b/Rx/v2/examples/linesfrombytes/main.cpp @@ -2,6 +2,7 @@ #include "rxcpp/rx.hpp" using namespace rxcpp; using namespace rxcpp::sources; +using namespace rxcpp::operators; using namespace rxcpp::util; #include <regex> @@ -14,64 +15,67 @@ int main() mt19937 gen(rd()); uniform_int_distribution<> dist(4, 18); - // produce byte stream that contains lines of text - auto bytes = range(1, 10). - map([&](int i){ - return from((uint8_t)('A' + i)). - repeat(dist(gen)). - concat(from((uint8_t)'\r')); - }). - merge(). - window(17). - map([](observable<uint8_t> w){ - return w. + // for testing purposes, produce byte stream that from lines of text + auto bytes = range(1, 10) | + flat_map([&](int i){ + auto body = from((uint8_t)('A' + i)) | + repeat(dist(gen)) | + as_dynamic(); + auto delim = from((uint8_t)'\r'); + return from(body, delim) | concat(); + }) | + window(17) | + flat_map([](observable<uint8_t> w){ + return w | reduce( - vector<uint8_t>(), + vector<uint8_t>(), [](vector<uint8_t>& v, uint8_t b){ - v.push_back(b); + v.push_back(b); return move(v); - }, - [](vector<uint8_t>& v){return move(v);}). - as_dynamic(); - }). - merge(). - filter([](vector<uint8_t>& v){ + }) | + as_dynamic(); + }) | + tap([](vector<uint8_t>& v){ + // print input packet of bytes copy(v.begin(), v.end(), ostream_iterator<long>(cout, " ")); - cout << endl; - return true; + cout << endl; }); + // + // recover lines of text from byte stream + // + // create strings split on \r - auto strings = bytes. - map([](vector<uint8_t> v){ + auto strings = bytes | + concat_map([](vector<uint8_t> v){ string s(v.begin(), v.end()); regex delim(R"/(\r)/"); - sregex_token_iterator cursor(s.begin(), s.end(), delim, {-1, 0}); - sregex_token_iterator end; + cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0}); + cregex_token_iterator end; vector<string> splits(cursor, end); return iterate(move(splits)); - }). - concat(); + }) | + filter([](string& s){ + return !s.empty(); + }); // group strings by line int group = 0; - auto linewindows = strings. + auto linewindows = strings | group_by( [=](string& s) mutable { return s.back() == '\r' ? group++ : group; - }, - [](string& s) { return move(s);}); + }); // reduce the strings for a line into one string - auto lines = linewindows. - map([](grouped_observable<int, string> w){ - return w.sum(); - }). - merge(); + auto lines = linewindows | + flat_map([](grouped_observable<int, string> w) { + return w | sum(); + }); // print result - lines. - subscribe(println(cout)); + lines | + subscribe<string>(println(cout)); return 0; } diff --git a/Rx/v2/src/rxcpp/operators/rx-concat.hpp b/Rx/v2/src/rxcpp/operators/rx-concat.hpp index a92b8b3..f6d637e 100644 --- a/Rx/v2/src/rxcpp/operators/rx-concat.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-concat.hpp @@ -203,12 +203,31 @@ public: } -template<class Coordination> +inline auto concat() + -> detail::concat_factory<identity_one_worker> { + return detail::concat_factory<identity_one_worker>(identity_current_thread()); +} + +template<class Coordination, class Check = typename std::enable_if<is_coordination<Coordination>::value>::type> auto concat(Coordination&& sf) -> detail::concat_factory<Coordination> { return detail::concat_factory<Coordination>(std::forward<Coordination>(sf)); } +template<class O0, class... ON, class Check = typename std::enable_if<is_observable<O0>::value>::type> +auto concat(O0&& o0, ON&&... on) + -> detail::concat_factory<identity_one_worker> { + return detail::concat_factory<identity_one_worker>(identity_current_thread())(from(std::forward<O0>(o0), std::forward<ON>(on)...)); +} + +template<class Coordination, class O0, class... ON, + class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type, + class CheckO = typename std::enable_if<is_observable<O0>::value>::type> +auto concat(Coordination&& sf, O0&& o0, ON&&... on) + -> detail::concat_factory<Coordination> { + return detail::concat_factory<Coordination>(std::forward<Coordination>(sf))(from(std::forward<O0>(o0), std::forward<ON>(on)...)); +} + } } diff --git a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp index 49f50af..d7b32c8 100644 --- a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp @@ -45,7 +45,7 @@ struct concat_traits { static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "concat_map ResultSelector must be a function with the signature concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type)"); - typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type; + typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type; }; template<class Observable, class CollectionSelector, class ResultSelector, class Coordination> @@ -267,6 +267,19 @@ auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf) return detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf)); } +template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type> +auto concat_map(CollectionSelector&& s, Coordination&& sf) + -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> { + return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf)); +} + +template<class CollectionSelector> +auto concat_map(CollectionSelector&& s) + -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> { + return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread()); +} + + } } diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp index dcc3521..7f5aae1 100644 --- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp @@ -30,7 +30,7 @@ struct flat_map_traits { static_assert(!std::is_same<decltype(collection_check<source_value_type, collection_selector_type>(0)), tag_not_valid>::value, "flat_map CollectionSelector must be a function with the signature observable(flat_map::source_value_type)"); - typedef decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr))) collection_type; + typedef rxu::decay_t<decltype((*(collection_selector_type*)nullptr)((*(source_value_type*)nullptr)))> collection_type; static_assert(is_observable<collection_type>::value, "flat_map CollectionSelector must return an observable"); @@ -43,7 +43,7 @@ struct flat_map_traits { static_assert(!std::is_same<decltype(result_check<source_value_type, collection_value_type, result_selector_type>(0)), tag_not_valid>::value, "flat_map ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type)"); - typedef decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr)) value_type; + typedef rxu::decay_t<decltype((*(result_selector_type*)nullptr)(*(source_value_type*)nullptr, *(collection_value_type*)nullptr))> value_type; }; template<class Observable, class CollectionSelector, class ResultSelector, class Coordination> @@ -234,8 +234,20 @@ auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf) return detail::flat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf)); } +template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type> +auto flat_map(CollectionSelector&& s, Coordination&& sf) + -> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> { + return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf)); } +template<class CollectionSelector> +auto flat_map(CollectionSelector&& s) + -> detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> { + return detail::flat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread()); } -#endif
\ No newline at end of file +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp index 203f3c9..6da0af0 100644 --- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp @@ -190,7 +190,7 @@ public: template<class Observable> struct group_by_factory_traits { - typedef rxu::value_type_t<Observable> value_type; + typedef rxu::value_type_t<rxu::decay_t<Observable>> value_type; typedef detail::group_by_traits<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> traits_type; typedef detail::group_by<value_type, Observable, KeySelector, MarbleSelector, BinaryPredicate> group_by_type; }; @@ -209,6 +209,18 @@ inline auto group_by(KeySelector ks, MarbleSelector ms, BinaryPredicate p) return detail::group_by_factory<KeySelector, MarbleSelector, BinaryPredicate>(std::move(ks), std::move(ms), std::move(p)); } +template<class KeySelector, class MarbleSelector> +inline auto group_by(KeySelector ks, MarbleSelector ms) + -> detail::group_by_factory<KeySelector, MarbleSelector, rxu::less> { + return detail::group_by_factory<KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less()); +} + +template<class KeySelector> +inline auto group_by(KeySelector ks) + -> detail::group_by_factory<KeySelector, rxu::detail::take_at<0>, rxu::less> { + return detail::group_by_factory<KeySelector, rxu::detail::take_at<0>, rxu::less>(std::move(ks), rxu::take_at<0>(), rxu::less()); +} + } diff --git a/Rx/v2/src/rxcpp/operators/rx-merge.hpp b/Rx/v2/src/rxcpp/operators/rx-merge.hpp index 17409d3..c46e752 100644 --- a/Rx/v2/src/rxcpp/operators/rx-merge.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-merge.hpp @@ -182,6 +182,11 @@ public: } +inline auto merge() + -> detail::merge_factory<identity_one_worker> { + return detail::merge_factory<identity_one_worker>(identity_current_thread()); +} + template<class Coordination> auto merge(Coordination&& sf) -> detail::merge_factory<Coordination> { diff --git a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp index 144c225..c3e234f 100644 --- a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp @@ -51,7 +51,7 @@ struct is_result_function_for { template<class CS, class CRS> static tag_not_valid check(...); - typedef decltype(check<seed_type, result_selector_type>(0)) type; + typedef rxu::decay_t<decltype(check<seed_type, result_selector_type>(0))> type; static const bool value = !std::is_same<type, tag_not_valid>::value; }; @@ -69,7 +69,7 @@ struct reduce_traits static_assert(is_result_function_for<seed_type, result_selector_type>::value, "reduce ResultSelector must be a function with the signature reduce::value_type(Seed)"); - typedef typename is_result_function_for<seed_type, result_selector_type>::type value_type; + typedef rxu::decay_t<typename is_result_function_for<seed_type, result_selector_type>::type> value_type; }; template<class T, class SourceOperator, class Accumulator, class ResultSelector, class Seed> @@ -189,6 +189,22 @@ public: } }; +template<template<class T> class Factory> +class delay_reduce_factory +{ + template<class Observable> using accumulator_t = Factory<rxu::value_type_t<Observable>>; + template<class Observable> using result_selector_t = Factory<rxu::value_type_t<Observable>>; + template<class Observable> using seed_t = typename Factory<rxu::value_type_t<Observable>>::seed_type; + template<class Observable> using result_value_t = decltype(result_selector_t<Observable>()(*(seed_t<Observable>*)nullptr)); +public: + template<class Observable> + auto operator()(const Observable& source) + -> observable<result_value_t<Observable>, reduce<rxu::value_type_t<Observable>, typename Observable::source_operator_type, accumulator_t<Observable>, result_selector_t<Observable>, seed_t<Observable>>> { + return observable<result_value_t<Observable>, reduce<rxu::value_type_t<Observable>, typename Observable::source_operator_type, accumulator_t<Observable>, result_selector_t<Observable>, seed_t<Observable>>>( + reduce<rxu::value_type_t<Observable>, typename Observable::source_operator_type, accumulator_t<Observable>, result_selector_t<Observable>, seed_t<Observable>>(source.source_operator, accumulator_t<Observable>(), result_selector_t<Observable>(), accumulator_t<Observable>().seed())); + } +}; + template<class T> struct initialize_seeder { typedef T seed_type; @@ -213,7 +229,7 @@ struct average { seed_type seed() { return seed_type{}; } - seed_type operator()(seed_type a, T v) { + seed_type operator()(seed_type& a, T v) { if (a.count != 0 && (a.count == std::numeric_limits<int>::max() || ((v > 0) && (a.value > (std::numeric_limits<T>::max() - v))) || @@ -235,7 +251,7 @@ struct average { } return a; } - double operator()(seed_type a) { + double operator()(seed_type& a) { if (a.count > 0) { double avg = a.value / a.count; if (!a.stage.empty()) { @@ -253,14 +269,14 @@ struct sum { seed_type seed() { return seed_type(); } - seed_type operator()(seed_type a, T v) { + seed_type operator()(seed_type& a, T v) { if (a.empty()) a.reset(v); else *a = *a + v; return a; } - T operator()(seed_type a) { + T operator()(seed_type& a) { if (a.empty()) throw rxcpp::empty_error("sum() requires a stream with at least one value"); return *a; @@ -273,14 +289,14 @@ struct max { seed_type seed() { return seed_type(); } - seed_type operator()(seed_type a, T v) { + seed_type operator()(seed_type& a, T v) { if (a.empty()) a.reset(v); else *a = (v < *a ? *a : v); return a; } - T operator()(seed_type a) { + T operator()(seed_type& a) { if (a.empty()) throw rxcpp::empty_error("max() requires a stream with at least one value"); return *a; @@ -293,14 +309,14 @@ struct min { seed_type seed() { return seed_type(); } - seed_type operator()(seed_type a, T v) { + seed_type operator()(seed_type& a, T v) { if (a.empty()) a.reset(v); else *a = (*a < v ? *a : v); return a; } - T operator()(seed_type a) { + T operator()(seed_type& a) { if (a.empty()) throw rxcpp::empty_error("min() requires a stream with at least one value"); return *a; @@ -315,6 +331,37 @@ auto reduce(Seed s, Accumulator a, ResultSelector rs) return detail::reduce_factory<Accumulator, ResultSelector, Seed>(std::move(a), std::move(rs), std::move(s)); } +template<class Seed, class Accumulator> +auto reduce(Seed s, Accumulator a) + -> detail::reduce_factory<Accumulator, rxu::detail::take_at<0>, Seed> { + return detail::reduce_factory<Accumulator, rxu::detail::take_at<0>, Seed>(std::move(a), rxu::take_at<0>(), std::move(s)); +} + +inline auto count() + -> detail::reduce_factory<rxu::count, rxu::detail::take_at<0>, int> { + return detail::reduce_factory<rxu::count, rxu::detail::take_at<0>, int>(rxu::count(), rxu::take_at<0>(), 0); +} + +inline auto average() + -> detail::delay_reduce_factory<detail::average> { + return detail::delay_reduce_factory<detail::average>(); +} + +inline auto sum() + -> detail::delay_reduce_factory<detail::sum> { + return detail::delay_reduce_factory<detail::sum>(); +} + +inline auto min() + -> detail::delay_reduce_factory<detail::min> { + return detail::delay_reduce_factory<detail::min>(); +} + +inline auto max() + -> detail::delay_reduce_factory<detail::max> { + return detail::delay_reduce_factory<detail::max>(); +} + } } diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp index 438f461..90de9a0 100644 --- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp @@ -105,9 +105,9 @@ public: template<class Observable> auto operator()(Observable&& source) - -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>> { - return observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>>( - repeat<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, count_type>(std::forward<Observable>(source), count)); + -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, rxu::decay_t<Observable>, count_type>> { + return observable<rxu::value_type_t<rxu::decay_t<Observable>>, repeat<rxu::value_type_t<rxu::decay_t<Observable>>, rxu::decay_t<Observable>, count_type>>( + repeat<rxu::value_type_t<rxu::decay_t<Observable>>, rxu::decay_t<Observable>, count_type>(std::forward<Observable>(source), count)); } }; @@ -115,7 +115,7 @@ public: template<class T> auto repeat(T&& t) --> detail::repeat_factory<T> { + -> detail::repeat_factory<T> { return detail::repeat_factory<T>(std::forward<T>(t)); } diff --git a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp index b532d6e..031bc53 100644 --- a/Rx/v2/src/rxcpp/rx-grouped_observable.hpp +++ b/Rx/v2/src/rxcpp/rx-grouped_observable.hpp @@ -174,8 +174,8 @@ public: // support range() >> filter() >> subscribe() syntax // '>>' is spelled 'stream' // -template<class T, class SourceOperator, class OperatorFactory> -auto operator >> (const rxcpp::grouped_observable<T, SourceOperator>& source, OperatorFactory&& of) +template<class K, class T, class SourceOperator, class OperatorFactory> +auto operator >> (const rxcpp::grouped_observable<K, T, SourceOperator>& source, OperatorFactory&& of) -> decltype(source.op(std::forward<OperatorFactory>(of))) { return source.op(std::forward<OperatorFactory>(of)); } @@ -184,8 +184,8 @@ auto operator >> (const rxcpp::grouped_observable<T, SourceOperator>& source, Op // support range() | filter() | subscribe() syntax // '|' is spelled 'pipe' // -template<class T, class SourceOperator, class OperatorFactory> -auto operator | (const rxcpp::grouped_observable<T, SourceOperator>& source, OperatorFactory&& of) +template<class K, class T, class SourceOperator, class OperatorFactory> +auto operator | (const rxcpp::grouped_observable<K, T, SourceOperator>& source, OperatorFactory&& of) -> decltype(source.op(std::forward<OperatorFactory>(of))) { return source.op(std::forward<OperatorFactory>(of)); } diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp index a4fb758..dcdc99d 100644 --- a/Rx/v2/src/rxcpp/rx-util.hpp +++ b/Rx/v2/src/rxcpp/rx-util.hpp @@ -214,6 +214,31 @@ inline auto pack() return detail::pack(); } +namespace detail { + +template<int Index> +struct take_at +{ + template<class... ParamN> + auto operator()(ParamN... pn) + -> decay_t<decltype(std::get<Index>(std::make_tuple(std::move(pn)...)))> { + return std::get<Index>(std::make_tuple(std::move(pn)...)); + } + template<class... ParamN> + auto operator()(ParamN... pn) const + -> decay_t<decltype(std::get<Index>(std::make_tuple(std::move(pn)...)))> { + return std::get<Index>(std::make_tuple(std::move(pn)...)); + } +}; + +} + +template<int Index> +inline auto take_at() + -> detail::take_at<Index> { + return detail::take_at<Index>(); +} + template <class D> struct resolve_type; |