diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-12-07 03:06:50 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-12-06 16:06:50 -0800 |
commit | 68ca4e5438edfcc54cc50b1c5090f799cfa192a4 (patch) | |
tree | ccf771d7da46763bb6baeb790e5d8f4076361f6e | |
parent | 1dd48669dacc5edafb64bb29afdf5fe22d2c0c6a (diff) | |
download | RxCpp-68ca4e5438edfcc54cc50b1c5090f799cfa192a4.tar.gz |
decouple map from observable (#281)
* decouple map from observable
* fix msvc compilation error
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-map.hpp | 77 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-includes.hpp | 1 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 26 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 8 | ||||
-rw-r--r-- | Rx/v2/test/operators/concat_map.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/flat_map.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/group_by.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/map.cpp | 191 | ||||
-rw-r--r-- | Rx/v2/test/operators/scan.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/subscribe_on.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/take_until.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/window.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/window_toggle.cpp | 1 | ||||
-rw-r--r-- | Rx/v2/test/subscriptions/subscription.cpp | 1 |
14 files changed, 266 insertions, 46 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-map.hpp b/Rx/v2/src/rxcpp/operators/rx-map.hpp index 6db251f..b72b2b2 100644 --- a/Rx/v2/src/rxcpp/operators/rx-map.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-map.hpp @@ -2,6 +2,21 @@ #pragma once +/*! \file rx-map.hpp + + \brief For each item from this observable use Selector to produce an item to emit from the new observable that is returned. + + \tparam Selector the type of the transforming function + + \param s the selector function + + \return Observable that emits the items from the source observable, transformed by the specified function. + + \sample + \snippet map.cpp map sample + \snippet output.txt map sample +*/ + #if !defined(RXCPP_OPERATORS_RX_MAP_HPP) #define RXCPP_OPERATORS_RX_MAP_HPP @@ -13,7 +28,16 @@ namespace operators { namespace detail { +template<class... AN> +struct map_invalid_arguments {}; +template<class... AN> +struct map_invalid : public rxo::operator_base<map_invalid_arguments<AN...>> { + using type = observable<map_invalid_arguments<AN...>, map_invalid<AN...>>; +}; +template<class... AN> +using map_invalid_t = typename map_invalid<AN...>::type; + template<class T, class Selector> struct map { @@ -33,7 +57,7 @@ struct map typedef map_observer<Subscriber> this_type; typedef decltype((*(select_type*)nullptr)(*(source_value_type*)nullptr)) value_type; typedef rxu::decay_t<Subscriber> dest_type; - typedef observer<T, this_type> observer_type; + typedef observer<source_value_type, this_type> observer_type; dest_type dest; mutable select_type selector; @@ -60,9 +84,9 @@ struct map dest.on_completed(); } - static subscriber<T, observer_type> make(dest_type d, select_type s) { + static subscriber<source_value_type, observer_type> make(dest_type d, select_type s) { auto cs = d.get_subscription(); - return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(s)))); + return make_subscriber<source_value_type>(std::move(cs), observer_type(this_type(std::move(d), std::move(s)))); } }; @@ -73,30 +97,41 @@ struct map } }; -template<class Selector> -class map_factory -{ - typedef rxu::decay_t<Selector> select_type; - select_type selector; -public: - map_factory(select_type s) : selector(std::move(s)) {} - template<class Observable> - auto operator()(Observable&& source) - -> decltype(source.template lift<rxu::value_type_t<map<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>>>(map<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>(selector))) { - return source.template lift<rxu::value_type_t<map<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>>>(map<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>(selector)); - } -}; - } -template<class Selector> -auto map(Selector&& p) - -> detail::map_factory<Selector> { - return detail::map_factory<Selector>(std::forward<Selector>(p)); +/*! @copydoc rx-map.hpp +*/ +template<class... AN> +auto map(AN&&... an) + -> operator_factory<map_tag, AN...> { + return operator_factory<map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); } } +template<> +struct member_overload<map_tag> +{ + template<class Observable, class Selector, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class ResolvedSelector = rxu::decay_t<Selector>, + class SourceValue = rxu::value_type_t<Observable>, + class Map = rxo::detail::map<SourceValue, ResolvedSelector>, + class Value = rxu::value_type_t<Map>> + static auto member(Observable&& o, Selector&& s) + -> decltype(o.template lift<Value>(Map(std::forward<Selector>(s)))) { + return o.template lift<Value>(Map(std::forward<Selector>(s))); + } + + template<class... AN> + static operators::detail::map_invalid_t<AN...> member(const AN...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "map takes Selector"); + } +}; + } #endif diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 0089085..29ff2e6 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -194,6 +194,7 @@ #include "operators/rx-finally.hpp" #include "operators/rx-group_by.hpp" #include "operators/rx-ignore_elements.hpp" +#include "operators/rx-map.hpp" #include "operators/rx-reduce.hpp" #include "operators/rx-with_latest_from.hpp" #include "operators/rx-zip.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 8c3fb70..7d37f45 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1102,25 +1102,15 @@ public: return lift<rxu::value_type_t<rxo::detail::on_error_resume_next<T, Selector>>>(rxo::detail::on_error_resume_next<T, Selector>(std::move(s))); } - /*! For each item from this observable use Selector to produce an item to emit from the new observable that is returned. - - \tparam Selector the type of the transforming function - - \param s the selector function - - \return Observable that emits the items from the source observable, transformed by the specified function. - - \sample - \snippet map.cpp map sample - \snippet output.txt map sample - */ - template<class Selector> - auto map(Selector s) const - /// \cond SHOW_SERVICE_MEMBERS - -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s)))) - /// \endcond + /*! @copydoc rx-map.hpp + */ + template<class... AN> + auto map(AN&&... an) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...)) + /// \endcond { - return lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s))); + return observable_member(map_tag{}, *this, std::forward<AN>(an)...); } /*! @copydoc rx-debounce.hpp diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index f92de1b..3f920b8 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -104,7 +104,6 @@ public: #include "operators/rx-connect_forever.hpp" #include "operators/rx-flat_map.hpp" #include "operators/rx-lift.hpp" -#include "operators/rx-map.hpp" #include "operators/rx-merge.hpp" #include "operators/rx-multicast.hpp" #include "operators/rx-observe_on.hpp" @@ -227,6 +226,13 @@ struct ignore_elements_tag { }; }; +struct map_tag { + template<class Included> + struct include_header{ + static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-map.hpp>"); + }; +}; + class empty_error: public std::runtime_error { public: diff --git a/Rx/v2/test/operators/concat_map.cpp b/Rx/v2/test/operators/concat_map.cpp index 531a638..2e891dc 100644 --- a/Rx/v2/test/operators/concat_map.cpp +++ b/Rx/v2/test/operators/concat_map.cpp @@ -1,6 +1,7 @@ #include "../test.h" #include <rxcpp/operators/rx-reduce.hpp> #include <rxcpp/operators/rx-filter.hpp> +#include <rxcpp/operators/rx-map.hpp> static const int static_tripletCount = 100; diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp index 55af15b..e90b3ea 100644 --- a/Rx/v2/test/operators/flat_map.cpp +++ b/Rx/v2/test/operators/flat_map.cpp @@ -1,6 +1,7 @@ #include "../test.h" #include <rxcpp/operators/rx-reduce.hpp> #include <rxcpp/operators/rx-filter.hpp> +#include <rxcpp/operators/rx-map.hpp> static const int static_tripletCount = 100; diff --git a/Rx/v2/test/operators/group_by.cpp b/Rx/v2/test/operators/group_by.cpp index a4a3c3b..8363de7 100644 --- a/Rx/v2/test/operators/group_by.cpp +++ b/Rx/v2/test/operators/group_by.cpp @@ -1,6 +1,7 @@ #include "../test.h" #include <rxcpp/operators/rx-group_by.hpp> #include <rxcpp/operators/rx-reduce.hpp> +#include <rxcpp/operators/rx-map.hpp> #include <locale> diff --git a/Rx/v2/test/operators/map.cpp b/Rx/v2/test/operators/map.cpp index d672d7e..c96cc8d 100644 --- a/Rx/v2/test/operators/map.cpp +++ b/Rx/v2/test/operators/map.cpp @@ -1,7 +1,8 @@ #include "../test.h" +#include <rxcpp/operators/rx-map.hpp> -SCENARIO("map stops on completion", "[map][operators]"){ - GIVEN("a test hot observable of ints"){ +SCENARIO("map stops on completion", "[map][operators]") { + GIVEN("a test hot observable of ints") { auto sc = rxsc::make_test(); auto w = sc.create_worker(); const rxsc::test::messages<int> on; @@ -19,7 +20,7 @@ SCENARIO("map stops on completion", "[map][operators]"){ on.error(430, std::runtime_error("error on unsubscribed stream")) }); - WHEN("mapped to ints that are one larger"){ + WHEN("mapped to ints that are one larger") { auto res = w.start( [xs, &invoked]() { @@ -33,7 +34,7 @@ SCENARIO("map stops on completion", "[map][operators]"){ } ); - THEN("the output stops on completion"){ + THEN("the output stops on completion") { auto required = rxu::to_vector({ on.next(210, 3), on.next(240, 4), @@ -45,7 +46,7 @@ SCENARIO("map stops on completion", "[map][operators]"){ REQUIRE(required == actual); } - THEN("there was one subscription and one unsubscription"){ + THEN("there was one subscription and one unsubscription") { auto required = rxu::to_vector({ on.subscribe(200, 400) }); @@ -53,9 +54,187 @@ SCENARIO("map stops on completion", "[map][operators]"){ REQUIRE(required == actual); } - THEN("map was called until completed"){ + THEN("map was called until completed") { REQUIRE(4 == invoked); } } } } + +SCENARIO("map - never", "[map][operators]") { + GIVEN("a source") { + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1) + }); + + WHEN("values are mapped") { + + auto res = w.start( + [xs]() { + return xs + | rxo::map([](int x) { + return x + 1; + }) + // forget type to workaround lambda deduction bug on msvc 2013 + | rxo::as_dynamic(); + } + ); + + THEN("the output is empty") { + auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source") { + auto required = rxu::to_vector({ + on.subscribe(200, 1000) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("map - empty", "[map][operators]") { + GIVEN("a source") { + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.completed(250) + }); + + WHEN("values are mapped") { + + auto res = w.start( + [xs]() { + return xs + .map([](int x) { + return x + 1; + }) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output only contains complete message") { + auto required = rxu::to_vector({ + on.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source") { + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("map - items emitted", "[map][operators]") { + GIVEN("a source") { + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.completed(300) + }); + + WHEN("values are mapped") { + + auto res = w.start( + [xs]() { + return xs + .map([](int x) { + return x + 1; + }) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output only contains items sent while subscribed") { + auto required = rxu::to_vector({ + on.next(210, 3), + on.next(240, 4), + on.completed(300) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source") { + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("map - throw", "[map][operators]") { + GIVEN("a source") { + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("map on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.error(250, ex) + }); + + WHEN("values are mapped") { + + auto res = w.start( + [xs]() { + return xs + .map([](int x) { + return x + 1; + }) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output only contains only error") { + auto required = rxu::to_vector({ + on.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source") { + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} diff --git a/Rx/v2/test/operators/scan.cpp b/Rx/v2/test/operators/scan.cpp index b7a7276..430d971 100644 --- a/Rx/v2/test/operators/scan.cpp +++ b/Rx/v2/test/operators/scan.cpp @@ -1,4 +1,5 @@ #include "../test.h" +#include <rxcpp/operators/rx-map.hpp> SCENARIO("scan: issue 41", "[scan][operators][issue][hide]"){ GIVEN("map of scan of interval"){ diff --git a/Rx/v2/test/operators/subscribe_on.cpp b/Rx/v2/test/operators/subscribe_on.cpp index 30d4cdc..80e9b2c 100644 --- a/Rx/v2/test/operators/subscribe_on.cpp +++ b/Rx/v2/test/operators/subscribe_on.cpp @@ -1,5 +1,6 @@ #include "../test.h" #include <rxcpp/operators/rx-reduce.hpp> +#include <rxcpp/operators/rx-map.hpp> static const int static_subscriptions = 50000; diff --git a/Rx/v2/test/operators/take_until.cpp b/Rx/v2/test/operators/take_until.cpp index e821f81..92e2fa8 100644 --- a/Rx/v2/test/operators/take_until.cpp +++ b/Rx/v2/test/operators/take_until.cpp @@ -1,4 +1,5 @@ #include "../test.h" +#include <rxcpp/operators/rx-map.hpp> SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){ GIVEN("2 sources"){ diff --git a/Rx/v2/test/operators/window.cpp b/Rx/v2/test/operators/window.cpp index 30f109e..8d022df 100644 --- a/Rx/v2/test/operators/window.cpp +++ b/Rx/v2/test/operators/window.cpp @@ -1,6 +1,7 @@ #include "../test.h" #include <rxcpp/operators/rx-reduce.hpp> +#include <rxcpp/operators/rx-map.hpp> SCENARIO("window count, basic", "[window][operators]"){ GIVEN("1 hot observable of ints."){ diff --git a/Rx/v2/test/operators/window_toggle.cpp b/Rx/v2/test/operators/window_toggle.cpp index 0c1cc9f..08dc210 100644 --- a/Rx/v2/test/operators/window_toggle.cpp +++ b/Rx/v2/test/operators/window_toggle.cpp @@ -1,4 +1,5 @@ #include "../test.h" +#include <rxcpp/operators/rx-map.hpp> SCENARIO("window toggle, basic", "[window_toggle][operators]"){ GIVEN("1 hot observable of ints and hot observable of opens."){ diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp index 7aa6973..9ae26dc 100644 --- a/Rx/v2/test/subscriptions/subscription.cpp +++ b/Rx/v2/test/subscriptions/subscription.cpp @@ -1,5 +1,6 @@ #include "../test.h" #include "rxcpp/operators/rx-combine_latest.hpp" +#include "rxcpp/operators/rx-map.hpp" SCENARIO("observe subscription", "[hide]"){ GIVEN("observable of ints"){ |