summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-12-07 03:06:50 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2016-12-06 16:06:50 -0800
commit68ca4e5438edfcc54cc50b1c5090f799cfa192a4 (patch)
treeccf771d7da46763bb6baeb790e5d8f4076361f6e
parent1dd48669dacc5edafb64bb29afdf5fe22d2c0c6a (diff)
downloadRxCpp-68ca4e5438edfcc54cc50b1c5090f799cfa192a4.tar.gz
decouple map from observable (#281)
* decouple map from observable * fix msvc compilation error
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-map.hpp77
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp26
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp8
-rw-r--r--Rx/v2/test/operators/concat_map.cpp1
-rw-r--r--Rx/v2/test/operators/flat_map.cpp1
-rw-r--r--Rx/v2/test/operators/group_by.cpp1
-rw-r--r--Rx/v2/test/operators/map.cpp191
-rw-r--r--Rx/v2/test/operators/scan.cpp1
-rw-r--r--Rx/v2/test/operators/subscribe_on.cpp1
-rw-r--r--Rx/v2/test/operators/take_until.cpp1
-rw-r--r--Rx/v2/test/operators/window.cpp1
-rw-r--r--Rx/v2/test/operators/window_toggle.cpp1
-rw-r--r--Rx/v2/test/subscriptions/subscription.cpp1
14 files changed, 266 insertions, 46 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-map.hpp b/Rx/v2/src/rxcpp/operators/rx-map.hpp
index 6db251f..b72b2b2 100644
--- a/Rx/v2/src/rxcpp/operators/rx-map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-map.hpp
@@ -2,6 +2,21 @@
#pragma once
+/*! \file rx-map.hpp
+
+ \brief For each item from this observable use Selector to produce an item to emit from the new observable that is returned.
+
+ \tparam Selector the type of the transforming function
+
+ \param s the selector function
+
+ \return Observable that emits the items from the source observable, transformed by the specified function.
+
+ \sample
+ \snippet map.cpp map sample
+ \snippet output.txt map sample
+*/
+
#if !defined(RXCPP_OPERATORS_RX_MAP_HPP)
#define RXCPP_OPERATORS_RX_MAP_HPP
@@ -13,7 +28,16 @@ namespace operators {
namespace detail {
+template<class... AN>
+struct map_invalid_arguments {};
+template<class... AN>
+struct map_invalid : public rxo::operator_base<map_invalid_arguments<AN...>> {
+ using type = observable<map_invalid_arguments<AN...>, map_invalid<AN...>>;
+};
+template<class... AN>
+using map_invalid_t = typename map_invalid<AN...>::type;
+
template<class T, class Selector>
struct map
{
@@ -33,7 +57,7 @@ struct map
typedef map_observer<Subscriber> this_type;
typedef decltype((*(select_type*)nullptr)(*(source_value_type*)nullptr)) value_type;
typedef rxu::decay_t<Subscriber> dest_type;
- typedef observer<T, this_type> observer_type;
+ typedef observer<source_value_type, this_type> observer_type;
dest_type dest;
mutable select_type selector;
@@ -60,9 +84,9 @@ struct map
dest.on_completed();
}
- static subscriber<T, observer_type> make(dest_type d, select_type s) {
+ static subscriber<source_value_type, observer_type> make(dest_type d, select_type s) {
auto cs = d.get_subscription();
- return make_subscriber<T>(std::move(cs), observer_type(this_type(std::move(d), std::move(s))));
+ return make_subscriber<source_value_type>(std::move(cs), observer_type(this_type(std::move(d), std::move(s))));
}
};
@@ -73,30 +97,41 @@ struct map
}
};
-template<class Selector>
-class map_factory
-{
- typedef rxu::decay_t<Selector> select_type;
- select_type selector;
-public:
- map_factory(select_type s) : selector(std::move(s)) {}
- template<class Observable>
- auto operator()(Observable&& source)
- -> decltype(source.template lift<rxu::value_type_t<map<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>>>(map<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>(selector))) {
- return source.template lift<rxu::value_type_t<map<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>>>(map<rxu::value_type_t<rxu::decay_t<Observable>>, select_type>(selector));
- }
-};
-
}
-template<class Selector>
-auto map(Selector&& p)
- -> detail::map_factory<Selector> {
- return detail::map_factory<Selector>(std::forward<Selector>(p));
+/*! @copydoc rx-map.hpp
+*/
+template<class... AN>
+auto map(AN&&... an)
+ -> operator_factory<map_tag, AN...> {
+ return operator_factory<map_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}
}
+template<>
+struct member_overload<map_tag>
+{
+ template<class Observable, class Selector,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class ResolvedSelector = rxu::decay_t<Selector>,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Map = rxo::detail::map<SourceValue, ResolvedSelector>,
+ class Value = rxu::value_type_t<Map>>
+ static auto member(Observable&& o, Selector&& s)
+ -> decltype(o.template lift<Value>(Map(std::forward<Selector>(s)))) {
+ return o.template lift<Value>(Map(std::forward<Selector>(s)));
+ }
+
+ template<class... AN>
+ static operators::detail::map_invalid_t<AN...> member(const AN...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "map takes Selector");
+ }
+};
+
}
#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 0089085..29ff2e6 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -194,6 +194,7 @@
#include "operators/rx-finally.hpp"
#include "operators/rx-group_by.hpp"
#include "operators/rx-ignore_elements.hpp"
+#include "operators/rx-map.hpp"
#include "operators/rx-reduce.hpp"
#include "operators/rx-with_latest_from.hpp"
#include "operators/rx-zip.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 8c3fb70..7d37f45 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1102,25 +1102,15 @@ public:
return lift<rxu::value_type_t<rxo::detail::on_error_resume_next<T, Selector>>>(rxo::detail::on_error_resume_next<T, Selector>(std::move(s)));
}
- /*! For each item from this observable use Selector to produce an item to emit from the new observable that is returned.
-
- \tparam Selector the type of the transforming function
-
- \param s the selector function
-
- \return Observable that emits the items from the source observable, transformed by the specified function.
-
- \sample
- \snippet map.cpp map sample
- \snippet output.txt map sample
- */
- template<class Selector>
- auto map(Selector s) const
- /// \cond SHOW_SERVICE_MEMBERS
- -> decltype(EXPLICIT_THIS lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s))))
- /// \endcond
+ /*! @copydoc rx-map.hpp
+ */
+ template<class... AN>
+ auto map(AN&&... an) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
+ /// \endcond
{
- return lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s)));
+ return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
}
/*! @copydoc rx-debounce.hpp
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index f92de1b..3f920b8 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -104,7 +104,6 @@ public:
#include "operators/rx-connect_forever.hpp"
#include "operators/rx-flat_map.hpp"
#include "operators/rx-lift.hpp"
-#include "operators/rx-map.hpp"
#include "operators/rx-merge.hpp"
#include "operators/rx-multicast.hpp"
#include "operators/rx-observe_on.hpp"
@@ -227,6 +226,13 @@ struct ignore_elements_tag {
};
};
+struct map_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-map.hpp>");
+ };
+};
+
class empty_error: public std::runtime_error
{
public:
diff --git a/Rx/v2/test/operators/concat_map.cpp b/Rx/v2/test/operators/concat_map.cpp
index 531a638..2e891dc 100644
--- a/Rx/v2/test/operators/concat_map.cpp
+++ b/Rx/v2/test/operators/concat_map.cpp
@@ -1,6 +1,7 @@
#include "../test.h"
#include <rxcpp/operators/rx-reduce.hpp>
#include <rxcpp/operators/rx-filter.hpp>
+#include <rxcpp/operators/rx-map.hpp>
static const int static_tripletCount = 100;
diff --git a/Rx/v2/test/operators/flat_map.cpp b/Rx/v2/test/operators/flat_map.cpp
index 55af15b..e90b3ea 100644
--- a/Rx/v2/test/operators/flat_map.cpp
+++ b/Rx/v2/test/operators/flat_map.cpp
@@ -1,6 +1,7 @@
#include "../test.h"
#include <rxcpp/operators/rx-reduce.hpp>
#include <rxcpp/operators/rx-filter.hpp>
+#include <rxcpp/operators/rx-map.hpp>
static const int static_tripletCount = 100;
diff --git a/Rx/v2/test/operators/group_by.cpp b/Rx/v2/test/operators/group_by.cpp
index a4a3c3b..8363de7 100644
--- a/Rx/v2/test/operators/group_by.cpp
+++ b/Rx/v2/test/operators/group_by.cpp
@@ -1,6 +1,7 @@
#include "../test.h"
#include <rxcpp/operators/rx-group_by.hpp>
#include <rxcpp/operators/rx-reduce.hpp>
+#include <rxcpp/operators/rx-map.hpp>
#include <locale>
diff --git a/Rx/v2/test/operators/map.cpp b/Rx/v2/test/operators/map.cpp
index d672d7e..c96cc8d 100644
--- a/Rx/v2/test/operators/map.cpp
+++ b/Rx/v2/test/operators/map.cpp
@@ -1,7 +1,8 @@
#include "../test.h"
+#include <rxcpp/operators/rx-map.hpp>
-SCENARIO("map stops on completion", "[map][operators]"){
- GIVEN("a test hot observable of ints"){
+SCENARIO("map stops on completion", "[map][operators]") {
+ GIVEN("a test hot observable of ints") {
auto sc = rxsc::make_test();
auto w = sc.create_worker();
const rxsc::test::messages<int> on;
@@ -19,7 +20,7 @@ SCENARIO("map stops on completion", "[map][operators]"){
on.error(430, std::runtime_error("error on unsubscribed stream"))
});
- WHEN("mapped to ints that are one larger"){
+ WHEN("mapped to ints that are one larger") {
auto res = w.start(
[xs, &invoked]() {
@@ -33,7 +34,7 @@ SCENARIO("map stops on completion", "[map][operators]"){
}
);
- THEN("the output stops on completion"){
+ THEN("the output stops on completion") {
auto required = rxu::to_vector({
on.next(210, 3),
on.next(240, 4),
@@ -45,7 +46,7 @@ SCENARIO("map stops on completion", "[map][operators]"){
REQUIRE(required == actual);
}
- THEN("there was one subscription and one unsubscription"){
+ THEN("there was one subscription and one unsubscription") {
auto required = rxu::to_vector({
on.subscribe(200, 400)
});
@@ -53,9 +54,187 @@ SCENARIO("map stops on completion", "[map][operators]"){
REQUIRE(required == actual);
}
- THEN("map was called until completed"){
+ THEN("map was called until completed") {
REQUIRE(4 == invoked);
}
}
}
}
+
+SCENARIO("map - never", "[map][operators]") {
+ GIVEN("a source") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("values are mapped") {
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ | rxo::map([](int x) {
+ return x + 1;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ | rxo::as_dynamic();
+ }
+ );
+
+ THEN("the output is empty") {
+ auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source") {
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("map - empty", "[map][operators]") {
+ GIVEN("a source") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("values are mapped") {
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .map([](int x) {
+ return x + 1;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains complete message") {
+ auto required = rxu::to_vector({
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source") {
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("map - items emitted", "[map][operators]") {
+ GIVEN("a source") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(240, 3),
+ on.completed(300)
+ });
+
+ WHEN("values are mapped") {
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .map([](int x) {
+ return x + 1;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed") {
+ auto required = rxu::to_vector({
+ on.next(210, 3),
+ on.next(240, 4),
+ on.completed(300)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source") {
+ auto required = rxu::to_vector({
+ on.subscribe(200, 300)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("map - throw", "[map][operators]") {
+ GIVEN("a source") {
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("map on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("values are mapped") {
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .map([](int x) {
+ return x + 1;
+ })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains only error") {
+ auto required = rxu::to_vector({
+ on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source") {
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
diff --git a/Rx/v2/test/operators/scan.cpp b/Rx/v2/test/operators/scan.cpp
index b7a7276..430d971 100644
--- a/Rx/v2/test/operators/scan.cpp
+++ b/Rx/v2/test/operators/scan.cpp
@@ -1,4 +1,5 @@
#include "../test.h"
+#include <rxcpp/operators/rx-map.hpp>
SCENARIO("scan: issue 41", "[scan][operators][issue][hide]"){
GIVEN("map of scan of interval"){
diff --git a/Rx/v2/test/operators/subscribe_on.cpp b/Rx/v2/test/operators/subscribe_on.cpp
index 30d4cdc..80e9b2c 100644
--- a/Rx/v2/test/operators/subscribe_on.cpp
+++ b/Rx/v2/test/operators/subscribe_on.cpp
@@ -1,5 +1,6 @@
#include "../test.h"
#include <rxcpp/operators/rx-reduce.hpp>
+#include <rxcpp/operators/rx-map.hpp>
static const int static_subscriptions = 50000;
diff --git a/Rx/v2/test/operators/take_until.cpp b/Rx/v2/test/operators/take_until.cpp
index e821f81..92e2fa8 100644
--- a/Rx/v2/test/operators/take_until.cpp
+++ b/Rx/v2/test/operators/take_until.cpp
@@ -1,4 +1,5 @@
#include "../test.h"
+#include <rxcpp/operators/rx-map.hpp>
SCENARIO("take_until trigger on_next", "[take_until][take][operators]"){
GIVEN("2 sources"){
diff --git a/Rx/v2/test/operators/window.cpp b/Rx/v2/test/operators/window.cpp
index 30f109e..8d022df 100644
--- a/Rx/v2/test/operators/window.cpp
+++ b/Rx/v2/test/operators/window.cpp
@@ -1,6 +1,7 @@
#include "../test.h"
#include <rxcpp/operators/rx-reduce.hpp>
+#include <rxcpp/operators/rx-map.hpp>
SCENARIO("window count, basic", "[window][operators]"){
GIVEN("1 hot observable of ints."){
diff --git a/Rx/v2/test/operators/window_toggle.cpp b/Rx/v2/test/operators/window_toggle.cpp
index 0c1cc9f..08dc210 100644
--- a/Rx/v2/test/operators/window_toggle.cpp
+++ b/Rx/v2/test/operators/window_toggle.cpp
@@ -1,4 +1,5 @@
#include "../test.h"
+#include <rxcpp/operators/rx-map.hpp>
SCENARIO("window toggle, basic", "[window_toggle][operators]"){
GIVEN("1 hot observable of ints and hot observable of opens."){
diff --git a/Rx/v2/test/subscriptions/subscription.cpp b/Rx/v2/test/subscriptions/subscription.cpp
index 7aa6973..9ae26dc 100644
--- a/Rx/v2/test/subscriptions/subscription.cpp
+++ b/Rx/v2/test/subscriptions/subscription.cpp
@@ -1,5 +1,6 @@
#include "../test.h"
#include "rxcpp/operators/rx-combine_latest.hpp"
+#include "rxcpp/operators/rx-map.hpp"
SCENARIO("observe subscription", "[hide]"){
GIVEN("observable of ints"){