From 1a5d65be7548069c4df12750f5d8f78bb852c6c2 Mon Sep 17 00:00:00 2001 From: Grigoriy Chudnov Date: Thu, 19 Jan 2017 23:00:10 +0300 Subject: decouple flat_map from observable --- Rx/v2/src/rxcpp/operators/rx-flat_map.hpp | 159 +++++++++++++++++++------- Rx/v2/src/rxcpp/rx-includes.hpp | 1 + Rx/v2/src/rxcpp/rx-observable.hpp | 57 +--------- Rx/v2/src/rxcpp/rx-operators.hpp | 8 +- Rx/v2/test/operators/flat_map.cpp | 181 ++++++++++++++++++++++++++++-- 5 files changed, 306 insertions(+), 100 deletions(-) diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp index 7f5aae1..371d8fd 100644 --- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp @@ -2,6 +2,32 @@ #pragma once +/*! \file rx-flat_map.hpp + + \brief For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. + For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. + + \tparam CollectionSelector the type of the observable producing function. CollectionSelector must be a function with the signature observable(flat_map::source_value_type) + \tparam ResultSelector the type of the aggregation function (optional). ResultSelector must be a function with the signature flat_map::value_type(flat_map::source_value_type, flat_map::collection_value_type). + \tparam Coordination the type of the scheduler (optional). + + \param s a function that returns an observable for each item emitted by the source observable. + \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable (optional). + \param cn the scheduler to synchronize sources from different contexts (optional). + + \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. + + Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable::flat_map that works similar but concatenates the observables. + + \sample + \snippet flat_map.cpp flat_map sample + \snippet output.txt flat_map sample + + \sample + \snippet flat_map.cpp threaded flat_map sample + \snippet output.txt threaded flat_map sample +*/ + #if !defined(RXCPP_OPERATORS_RX_FLATMAP_HPP) #define RXCPP_OPERATORS_RX_FLATMAP_HPP @@ -13,6 +39,16 @@ namespace operators { namespace detail { +template +struct flat_map_invalid_arguments {}; + +template +struct flat_map_invalid : public rxo::operator_base> { + using type = observable, flat_map_invalid>; +}; +template +using flat_map_invalid_t = typename flat_map_invalid::type; + template struct flat_map_traits { typedef rxu::decay_t source_type; @@ -200,53 +236,96 @@ struct flat_map } }; -template -class flat_map_factory -{ - typedef rxu::decay_t collection_selector_type; - typedef rxu::decay_t result_selector_type; - typedef rxu::decay_t coordination_type; - - collection_selector_type selectorCollection; - result_selector_type selectorResult; - coordination_type coordination; -public: - flat_map_factory(collection_selector_type s, result_selector_type rs, coordination_type sf) - : selectorCollection(std::move(s)) - , selectorResult(std::move(rs)) - , coordination(std::move(sf)) - { - } - - template - auto operator()(Observable&& source) - -> observable>, flat_map> { - return observable>, flat_map>( - flat_map(std::forward(source), selectorCollection, selectorResult, coordination)); - } -}; - } -template -auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& sf) - -> detail::flat_map_factory { - return detail::flat_map_factory(std::forward(s), std::forward(rs), std::forward(sf)); +/*! @copydoc rx-flat_map.hpp +*/ +template +auto flat_map(AN&&... an) +-> operator_factory { + return operator_factory(std::make_tuple(std::forward(an)...)); } -template::value>::type> -auto flat_map(CollectionSelector&& s, Coordination&& sf) - -> detail::flat_map_factory, Coordination> { - return detail::flat_map_factory, Coordination>(std::forward(s), rxu::take_at<1>(), std::forward(sf)); } -template -auto flat_map(CollectionSelector&& s) - -> detail::flat_map_factory, identity_one_worker> { - return detail::flat_map_factory, identity_one_worker>(std::forward(s), rxu::take_at<1>(), identity_current_thread()); -} +template<> +struct member_overload +{ + template, + class SourceValue = rxu::value_type_t, + class CollectionType = rxu::result_of_t, + class ResultSelectorType = rxu::detail::take_at<1>, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables>, + class FlatMap = rxo::detail::flat_map, rxu::decay_t, ResultSelectorType, identity_one_worker>, + class CollectionValueType = rxu::value_type_t, + class Value = rxu::result_of_t, + class Result = observable + > + static Result member(Observable&& o, CollectionSelector&& s) { + return Result(FlatMap(std::forward(o), std::forward(s), ResultSelectorType(), identity_current_thread())); + } -} + template, + class SourceValue = rxu::value_type_t, + class CollectionType = rxu::result_of_t, + class ResultSelectorType = rxu::detail::take_at<1>, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables, + is_coordination>, + class FlatMap = rxo::detail::flat_map, rxu::decay_t, ResultSelectorType, rxu::decay_t>, + class CollectionValueType = rxu::value_type_t, + class Value = rxu::result_of_t, + class Result = observable + > + static Result member(Observable&& o, CollectionSelector&& s, Coordination&& cn) { + return Result(FlatMap(std::forward(o), std::forward(s), ResultSelectorType(), std::forward(cn))); + } + + template, + class CollectionSelectorType = rxu::decay_t, + class SourceValue = rxu::value_type_t, + class CollectionType = rxu::result_of_t, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables, + rxu::negation>, + class FlatMap = rxo::detail::flat_map, rxu::decay_t, rxu::decay_t, identity_one_worker>, + class CollectionValueType = rxu::value_type_t, + class ResultSelectorType = rxu::decay_t, + class Value = rxu::result_of_t, + class Result = observable + > + static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs) { + return Result(FlatMap(std::forward(o), std::forward(s), std::forward(rs), identity_current_thread())); + } + + template, + class SourceValue = rxu::value_type_t, + class CollectionType = rxu::result_of_t, + class Enabled = rxu::enable_if_all_true_type_t< + all_observables, + is_coordination>, + class FlatMap = rxo::detail::flat_map, rxu::decay_t, rxu::decay_t, rxu::decay_t>, + class CollectionValueType = rxu::value_type_t, + class ResultSelectorType = rxu::decay_t, + class Value = rxu::result_of_t, + class Result = observable + > + static Result member(Observable&& o, CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) { + return Result(FlatMap(std::forward(o), std::forward(s), std::forward(rs), std::forward(cn))); + } + + template + static operators::detail::flat_map_invalid_t member(AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "flat_map takes (CollectionSelector, optional ResultSelector, optional Coordination)"); + } +}; } diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index ac70f5b..bc2a435 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -198,6 +198,7 @@ #include "operators/rx-element_at.hpp" #include "operators/rx-filter.hpp" #include "operators/rx-finally.hpp" +#include "operators/rx-flat_map.hpp" #include "operators/rx-group_by.hpp" #include "operators/rx-ignore_elements.hpp" #include "operators/rx-map.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index c035f91..9afceae 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1072,60 +1072,15 @@ public: return observable_member(amb_tag{}, *this, std::forward(an)...); } - /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. - For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. - - \tparam CollectionSelector the type of the observable producing function - \tparam ResultSelector the type of the aggregation function - - \param s a function that returns an observable for each item emitted by the source observable - \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable - - \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. - - Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable::concat_map that works similar but concatenates the observables. - - \sample - \snippet flat_map.cpp flat_map sample - \snippet output.txt flat_map sample - */ - template - auto flat_map(CollectionSelector&& s, ResultSelector&& rs) const - /// \cond SHOW_SERVICE_MEMBERS - -> observable>, rxo::detail::flat_map> - /// \endcond - { - return observable>, rxo::detail::flat_map>( - rxo::detail::flat_map(*this, std::forward(s), std::forward(rs), identity_current_thread())); - } - - /*! For each item from this observable use the CollectionSelector to produce an observable and subscribe to that observable. - For each item from all of the produced observables use the ResultSelector to produce a value to emit from the new observable that is returned. - - \tparam CollectionSelector the type of the observable producing function - \tparam ResultSelector the type of the aggregation function - \tparam Coordination the type of the scheduler - - \param s a function that returns an observable for each item emitted by the source observable - \param rs a function that combines one item emitted by each of the source and collection observables and returns an item to be emitted by the resulting observable - \param cn the scheduler to synchronize sources from different contexts. - - \return Observable that emits the results of applying a function to a pair of values emitted by the source observable and the collection observable. - - Observables, produced by the CollectionSelector, are merged. There is another operator rxcpp::observable::concat_map that works similar but concatenates the observables. - - \sample - \snippet flat_map.cpp threaded flat_map sample - \snippet output.txt threaded flat_map sample - */ - template - auto flat_map(CollectionSelector&& s, ResultSelector&& rs, Coordination&& cn) const + /*! @copydoc rx-flat_map.hpp + */ + template + auto flat_map(AN&&... an) const /// \cond SHOW_SERVICE_MEMBERS - -> observable>, rxo::detail::flat_map> + -> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward(an)...)) /// \endcond { - return observable>, rxo::detail::flat_map>( - rxo::detail::flat_map(*this, std::forward(s), std::forward(rs), std::forward(cn))); + return observable_member(flat_map_tag{}, *this, std::forward(an)...); } /*! @copydoc rx-concat.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 5a70f26..9fb23de 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -96,7 +96,6 @@ public: } #include "operators/rx-connect_forever.hpp" -#include "operators/rx-flat_map.hpp" #include "operators/rx-lift.hpp" #include "operators/rx-multicast.hpp" #include "operators/rx-observe_on.hpp" @@ -226,6 +225,13 @@ struct finally_tag { }; }; +struct flat_map_tag { + template + struct include_header{ + static_assert(Included::value, "missing include: please #include "); + }; +}; + struct group_by_tag { template struct include_header{ diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp index 724df65..03cb36e 100644 --- a/Rx/v2/test/operators/flat_map.cpp +++ b/Rx/v2/test/operators/flat_map.cpp @@ -3,6 +3,7 @@ #include #include #include +#include static const int static_tripletCount = 100; @@ -259,13 +260,13 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){ auto res = w.start( [&]() { return xs - .flat_map( + | rxo::flat_map( [&](int){ return ys;}, [](int, std::string s){ return s;}) // forget type to workaround lambda deduction bug on msvc 2013 - .as_dynamic(); + | rxo::as_dynamic(); } ); @@ -313,19 +314,19 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){ } } - WHEN("streamed, each int is mapped to the strings"){ + WHEN("each int is mapped to the strings with coordinator"){ auto res = w.start( [&]() { - return xs >> - rxo::flat_map( + return xs + .flat_map( [&](int){ return ys;}, [](int, std::string s){ return s;}, - rx::identity_current_thread()) >> + rx::identity_current_thread()) // forget type to workaround lambda deduction bug on msvc 2013 - rxo::as_dynamic(); + .as_dynamic(); } ); @@ -375,7 +376,6 @@ SCENARIO("flat_map completes", "[flat_map][map][operators]"){ } } - SCENARIO("flat_map source never ends", "[flat_map][map][operators]"){ GIVEN("two cold observables. one of ints. one of strings."){ auto sc = rxsc::make_test(); @@ -536,3 +536,168 @@ SCENARIO("flat_map inner error", "[flat_map][map][operators]"){ } } } + +SCENARIO("flat_map, no result selector, no coordination", "[flat_map][map][operators]"){ + GIVEN("two cold observables. one of ints. one of strings."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages i_on; + const rxsc::test::messages s_on; + + auto xs = sc.make_cold_observable({ + i_on.next(100, 4), + i_on.next(200, 2), + i_on.next(300, 3), + i_on.next(400, 1), + i_on.completed(500) + }); + + auto ys = sc.make_cold_observable({ + s_on.next(50, "foo"), + s_on.next(100, "bar"), + s_on.next(150, "baz"), + s_on.next(200, "qux"), + s_on.completed(250) + }); + + WHEN("each int is mapped to the strings"){ + + auto res = w.start( + [&]() { + return xs + .flat_map( + [&](int){ + return ys;}) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains strings repeated for each int"){ + auto required = rxu::to_vector({ + s_on.next(350, "foo"), + s_on.next(400, "bar"), + s_on.next(450, "baz"), + s_on.next(450, "foo"), + s_on.next(500, "qux"), + s_on.next(500, "bar"), + s_on.next(550, "baz"), + s_on.next(550, "foo"), + s_on.next(600, "qux"), + s_on.next(600, "bar"), + s_on.next(650, "baz"), + s_on.next(650, "foo"), + s_on.next(700, "qux"), + s_on.next(700, "bar"), + s_on.next(750, "baz"), + s_on.next(800, "qux"), + s_on.completed(850) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ints"){ + auto required = rxu::to_vector({ + i_on.subscribe(200, 700) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there were four subscription and unsubscription to the strings"){ + auto required = rxu::to_vector({ + s_on.subscribe(300, 550), + s_on.subscribe(400, 650), + s_on.subscribe(500, 750), + s_on.subscribe(600, 850) + }); + auto actual = ys.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("flat_map, no result selector, with coordination", "[flat_map][map][operators]"){ + GIVEN("two cold observables. one of ints. one of strings."){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages i_on; + const rxsc::test::messages s_on; + + auto xs = sc.make_cold_observable({ + i_on.next(100, 4), + i_on.next(200, 2), + i_on.next(300, 3), + i_on.next(400, 1), + i_on.completed(500) + }); + + auto ys = sc.make_cold_observable({ + s_on.next(50, "foo"), + s_on.next(100, "bar"), + s_on.next(150, "baz"), + s_on.next(200, "qux"), + s_on.completed(250) + }); + + WHEN("each int is mapped to the strings"){ + + auto res = w.start( + [&]() { + return xs + .flat_map( + [&](int){ + return ys;}, + rx::identity_current_thread()) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output contains strings repeated for each int"){ + auto required = rxu::to_vector({ + s_on.next(350, "foo"), + s_on.next(400, "bar"), + s_on.next(450, "baz"), + s_on.next(450, "foo"), + s_on.next(500, "qux"), + s_on.next(500, "bar"), + s_on.next(550, "baz"), + s_on.next(550, "foo"), + s_on.next(600, "qux"), + s_on.next(600, "bar"), + s_on.next(650, "baz"), + s_on.next(650, "foo"), + s_on.next(700, "qux"), + s_on.next(700, "bar"), + s_on.next(750, "baz"), + s_on.next(800, "qux"), + s_on.completed(850) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription to the ints"){ + auto required = rxu::to_vector({ + i_on.subscribe(200, 700) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + THEN("there were four subscription and unsubscription to the strings"){ + auto required = rxu::to_vector({ + s_on.subscribe(300, 550), + s_on.subscribe(400, 650), + s_on.subscribe(500, 750), + s_on.subscribe(600, 850) + }); + auto actual = ys.subscriptions(); + REQUIRE(required == actual); + } + } + } +} \ No newline at end of file -- cgit v1.2.3