diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2017-01-18 22:07:02 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-01-18 16:08:48 -0800 |
commit | 3ada27ecd97f762cb9b3465f1757fdda1b87b9f2 (patch) | |
tree | b8733f4b6d864cc71fc017558e9d963356ac0a35 | |
parent | dda07ac5ef860f384463a3f6b27e4d5096a45532 (diff) | |
download | RxCpp-3ada27ecd97f762cb9b3465f1757fdda1b87b9f2.tar.gz |
decouple concat_map from observable
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-concat_map.hpp | 158 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 57 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 10 | ||||
-rw-r--r-- | Rx/v2/test/operators/concat_map.cpp | 155 |
5 files changed, 281 insertions, 100 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp index d7b32c8..91ccfa5 100644 --- a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp @@ -2,6 +2,32 @@ #pragma once +/*! \file rx-concat_map.hpp + + \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. + For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. + + \tparam CollectionSelector the type of the observable producing function. CollectionSelector must be a function with the signature: observable(concat_map::source_value_type) + \tparam ResultSelector the type of the aggregation function (optional). ResultSelector must be a function with the signature: concat_map::value_type(concat_map::source_value_type, concat_map::collection_value_type) + \tparam Coordination the type of the scheduler (optional). + + \param s a function that returns an observable for each item emitted by the source observable. + \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional). + \param cn the scheduler to synchronize sources from different contexts. (optional). + + \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. + + Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables. + + \sample + \snippet concat_map.cpp concat_map sample + \snippet output.txt concat_map sample + + \sample + \snippet concat_map.cpp threaded concat_map sample + \snippet output.txt threaded concat_map sample +*/ + #if !defined(RXCPP_OPERATORS_RX_CONCATMAP_HPP) #define RXCPP_OPERATORS_RX_CONCATMAP_HPP @@ -13,6 +39,16 @@ namespace operators { namespace detail { +template<class... AN> +struct concat_map_invalid_arguments {}; + +template<class... AN> +struct concat_map_invalid : public rxo::operator_base<concat_map_invalid_arguments<AN...>> { + using type = observable<concat_map_invalid_arguments<AN...>, concat_map_invalid<AN...>>; +}; +template<class... AN> +using concat_map_invalid_t = typename concat_map_invalid<AN...>::type; + template<class Observable, class CollectionSelector, class ResultSelector, class Coordination> struct concat_traits { typedef rxu::decay_t<Observable> source_type; @@ -233,54 +269,96 @@ private: concat_map& operator=(const concat_map&) RXCPP_DELETE; }; -template<class CollectionSelector, class ResultSelector, class Coordination> -class concat_map_factory -{ - typedef rxu::decay_t<CollectionSelector> collection_selector_type; - typedef rxu::decay_t<ResultSelector> result_selector_type; - typedef rxu::decay_t<Coordination> coordination_type; - - collection_selector_type selectorCollection; - result_selector_type selectorResult; - coordination_type coordination; -public: - concat_map_factory(collection_selector_type s, result_selector_type rs, coordination_type sf) - : selectorCollection(std::move(s)) - , selectorResult(std::move(rs)) - , coordination(std::move(sf)) - { - } - - template<class Observable> - auto operator()(Observable&& source) - -> observable<rxu::value_type_t<concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>, concat_map<Observable, CollectionSelector, ResultSelector, Coordination>> { - return observable<rxu::value_type_t<concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>, concat_map<Observable, CollectionSelector, ResultSelector, Coordination>>( - concat_map<Observable, CollectionSelector, ResultSelector, Coordination>(std::forward<Observable>(source), selectorCollection, selectorResult, coordination)); - } -}; - } -template<class CollectionSelector, class ResultSelector, class Coordination> -auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf) - -> detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination> { - return detail::concat_map_factory<CollectionSelector, ResultSelector, Coordination>(std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(sf)); +/*! @copydoc rx-concat_map.hpp +*/ +template<class... AN> +auto concat_map(AN&&... an) +-> operator_factory<concat_map_tag, AN...> { + return operator_factory<concat_map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); } -template<class CollectionSelector, class Coordination, class CheckC = typename std::enable_if<is_coordination<Coordination>::value>::type> -auto concat_map(CollectionSelector&& s, Coordination&& sf) - -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination> { - return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, Coordination>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), std::forward<Coordination>(sf)); } -template<class CollectionSelector> -auto concat_map(CollectionSelector&& s) - -> detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker> { - return detail::concat_map_factory<CollectionSelector, rxu::detail::take_at<1>, identity_one_worker>(std::forward<CollectionSelector>(s), rxu::take_at<1>(), identity_current_thread()); -} +template<> +struct member_overload<concat_map_tag> +{ + template<class Observable, class CollectionSelector, + class CollectionSelectorType = rxu::decay_t<CollectionSelector>, + class SourceValue = rxu::value_type_t<Observable>, + class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, + class ResultSelectorType = rxu::detail::take_at<1>, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, CollectionType>>, + class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, identity_one_worker>, + class CollectionValueType = rxu::value_type_t<CollectionType>, + class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, + class Result = observable<Value, ConcatMap> + > + static Result member(Observable&& o, CollectionSelector&& s) { + return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), identity_current_thread())); + } + template<class Observable, class CollectionSelector, class Coordination, + class CollectionSelectorType = rxu::decay_t<CollectionSelector>, + class SourceValue = rxu::value_type_t<Observable>, + class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, + class ResultSelectorType = rxu::detail::take_at<1>, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, CollectionType>, + is_coordination<Coordination>>, + class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, ResultSelectorType, rxu::decay_t<Coordination>>, + class CollectionValueType = rxu::value_type_t<CollectionType>, + class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, + class Result = observable<Value, ConcatMap> + > + static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) { + return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), ResultSelectorType(), std::forward<Coordination>(cn))); + } -} + template<class Observable, class CollectionSelector, class ResultSelector, + class IsCoordination = is_coordination<ResultSelector>, + class CollectionSelectorType = rxu::decay_t<CollectionSelector>, + class SourceValue = rxu::value_type_t<Observable>, + class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, CollectionType>, + rxu::negation<IsCoordination>>, + class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, identity_one_worker>, + class CollectionValueType = rxu::value_type_t<CollectionType>, + class ResultSelectorType = rxu::decay_t<ResultSelector>, + class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, + class Result = observable<Value, ConcatMap> + > + static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) { + return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread())); + } + + template<class Observable, class CollectionSelector, class ResultSelector, class Coordination, + class CollectionSelectorType = rxu::decay_t<CollectionSelector>, + class SourceValue = rxu::value_type_t<Observable>, + class CollectionType = rxu::result_of_t<CollectionSelectorType(SourceValue)>, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables<Observable, CollectionType>, + is_coordination<Coordination>>, + class ConcatMap = rxo::detail::concat_map<rxu::decay_t<Observable>, rxu::decay_t<CollectionSelector>, rxu::decay_t<ResultSelector>, rxu::decay_t<Coordination>>, + class CollectionValueType = rxu::value_type_t<CollectionType>, + class ResultSelectorType = rxu::decay_t<ResultSelector>, + class Value = rxu::result_of_t<ResultSelectorType(SourceValue, CollectionValueType)>, + class Result = observable<Value, ConcatMap> + > + static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) { + return Result(ConcatMap(std::forward<Observable>(o), std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn))); + } + + template<class... AN> + static operators::detail::concat_map_invalid_t<AN...> member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "concat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)"); + } +}; } diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 1d47997..ac70f5b 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -190,6 +190,7 @@ #include "operators/rx-buffer_time_count.hpp" #include "operators/rx-combine_latest.hpp" #include "operators/rx-concat.hpp" +#include "operators/rx-concat_map.hpp" #include "operators/rx-debounce.hpp" #include "operators/rx-delay.hpp" #include "operators/rx-distinct.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 9f90663..c035f91 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1139,60 +1139,15 @@ public: return observable_member(concat_tag{}, *this, std::forward<AN>(an)...); } - /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. - For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. - - \tparam CollectionSelector the type of the observable producing function - \tparam ResultSelector the type of the aggregation function - - \param s a function that returns an observable for each item emitted by the source observable - \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable - - \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. - - Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables. - - \sample - \snippet concat_map.cpp concat_map sample - \snippet output.txt concat_map sample - */ - template<class CollectionSelector, class ResultSelector> - auto concat_map(CollectionSelector&& s, ResultSelector&& rs) const - /// \cond SHOW_SERVICE_MEMBERS - -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>> - /// \endcond - { - return observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>>( - rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, identity_one_worker>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), identity_current_thread())); - } - - /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. - For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. - - \tparam CollectionSelector the type of the observable producing function - \tparam ResultSelector the type of the aggregation function - \tparam Coordination the type of the scheduler - - \param s a function that returns an observable for each item emitted by the source observable - \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable - \param cn the scheduler to synchronize sources from different contexts. - - \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. - - Observables, produced by the CollectionSelector, are concatenated. There is another operator rxcpp::observable<T,SourceType>::flat_map that works similar but merges the observables. - - \sample - \snippet concat_map.cpp threaded concat_map sample - \snippet output.txt threaded concat_map sample - */ - template<class CollectionSelector, class ResultSelector, class Coordination> - auto concat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) const + /*! @copydoc rx-concat_map.hpp + */ + template<class... AN> + auto concat_map(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>> + -> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) /// \endcond { - return observable<rxu::value_type_t<rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>, rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>>( - rxo::detail::concat_map<this_type, CollectionSelector, ResultSelector, Coordination>(*this, std::forward<CollectionSelector>(s), std::forward<ResultSelector>(rs), std::forward<Coordination>(cn))); + return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...); } /*! @copydoc rx-with_latest_from.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index e5a3eaf..5a70f26 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -95,7 +95,6 @@ public: } -#include "operators/rx-concat_map.hpp" #include "operators/rx-connect_forever.hpp" #include "operators/rx-flat_map.hpp" #include "operators/rx-lift.hpp" @@ -169,7 +168,14 @@ struct concat_tag { struct include_header{ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-concat.hpp>"); }; -}; +}; + +struct concat_map_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-concat_map.hpp>"); + }; +}; struct debounce_tag { template<class Included> diff --git a/Rx/v2/test/operators/concat_map.cpp b/Rx/v2/test/operators/concat_map.cpp index 8bf242a..761fdff 100644 --- a/Rx/v2/test/operators/concat_map.cpp +++ b/Rx/v2/test/operators/concat_map.cpp @@ -3,6 +3,7 @@ #include <rxcpp/operators/rx-filter.hpp> #include <rxcpp/operators/rx-map.hpp> #include <rxcpp/operators/rx-take.hpp> +#include <rxcpp/operators/rx-concat_map.hpp> static const int static_tripletCount = 100; @@ -223,13 +224,13 @@ SCENARIO("concat_map completes", "[concat_map][map][operators]"){ auto res = w.start( [&]() { return xs - .concat_map( + | rxo::concat_map( [&](int){ return ys;}, [](int, std::string s){ return s;}) // forget type to workaround lambda deduction bug on msvc 2013 - .as_dynamic(); + | rxo::as_dynamic(); } ); @@ -267,19 +268,19 @@ SCENARIO("concat_map completes", "[concat_map][map][operators]"){ } } - WHEN("streamed each int is mapped to the strings"){ + WHEN("each int is mapped to the strings with coordinator"){ auto res = w.start( [&]() { - return xs >> - rxo::concat_map( + return xs + .concat_map( [&](int){ return ys;}, [](int, std::string s){ return s;}, - rx::identity_current_thread()) >> + rx::identity_current_thread()) // forget type to workaround lambda deduction bug on msvc 2013 - rxo::as_dynamic(); + .as_dynamic(); } ); @@ -319,3 +320,143 @@ SCENARIO("concat_map completes", "[concat_map][map][operators]"){ } } +SCENARIO("concat_map, no result selector, no coordination", "[concat_map][map][operators]"){ + GIVEN("two cold observables. one of ints. one of strings."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> i_on; + const rxsc::test::messages<std::string> s_on; + + auto xs = sc.make_cold_observable({ + i_on.next(100, 4), + i_on.next(200, 2), + i_on.completed(500) + }); + + auto ys = sc.make_cold_observable({ + s_on.next(50, "foo"), + s_on.next(100, "bar"), + s_on.next(150, "baz"), + s_on.next(200, "qux"), + s_on.completed(250) + }); + + WHEN("each int is mapped to the strings"){ + + auto res = w.start( + [&]() { + return xs + .concat_map( + [&](int){ + return ys;}) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains strings repeated for each int"){ + auto required = rxu::to_vector({ + s_on.next(350, "foo"), + s_on.next(400, "bar"), + s_on.next(450, "baz"), + s_on.next(500, "qux"), + s_on.next(600, "foo"), + s_on.next(650, "bar"), + s_on.next(700, "baz"), + s_on.next(750, "qux"), + s_on.completed(800) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ints"){ + auto required = rxu::to_vector({ + i_on.subscribe(200, 700) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there were 2 subscription and unsubscription to the strings"){ + auto required = rxu::to_vector({ + s_on.subscribe(300, 550), + s_on.subscribe(550, 800) + }); + auto actual = ys.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("concat_map, no result selector, with coordination", "[concat_map][map][operators]"){ + GIVEN("two cold observables. one of ints. one of strings."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> i_on; + const rxsc::test::messages<std::string> s_on; + + auto xs = sc.make_cold_observable({ + i_on.next(100, 4), + i_on.next(200, 2), + i_on.completed(500) + }); + + auto ys = sc.make_cold_observable({ + s_on.next(50, "foo"), + s_on.next(100, "bar"), + s_on.next(150, "baz"), + s_on.next(200, "qux"), + s_on.completed(250) + }); + + WHEN("each int is mapped to the strings"){ + + auto res = w.start( + [&]() { + return xs + .concat_map( + [&](int){ + return ys;}, + rx::identity_current_thread()) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains strings repeated for each int"){ + auto required = rxu::to_vector({ + s_on.next(350, "foo"), + s_on.next(400, "bar"), + s_on.next(450, "baz"), + s_on.next(500, "qux"), + s_on.next(600, "foo"), + s_on.next(650, "bar"), + s_on.next(700, "baz"), + s_on.next(750, "qux"), + s_on.completed(800) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ints"){ + auto required = rxu::to_vector({ + i_on.subscribe(200, 700) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there were 2 subscription and unsubscription to the strings"){ + auto required = rxu::to_vector({ + s_on.subscribe(300, 550), + s_on.subscribe(550, 800) + }); + auto actual = ys.subscriptions(); + REQUIRE(required == actual); + } + } + } +} |