summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-12-09 23:48:45 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2016-12-09 14:30:44 -0800
commit2ac078e3b0c78785134a195c26e7b55a6758f3c7 (patch)
tree3cf3abd0726885c47f60311cd7a55a0b7eab9873 /Rx/v2
parent0cf6cacc8114b9e503b674bdf21042892f73fb3a (diff)
downloadRxCpp-2ac078e3b0c78785134a195c26e7b55a6758f3c7.tar.gz
add an optional BinaryPredicate to distinct_until_changed for element comparison
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp50
-rw-r--r--Rx/v2/test/operators/distinct_until_changed.cpp54
2 files changed, 92 insertions, 12 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp b/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp
index 761418e..7d8ef5a 100644
--- a/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp
@@ -6,6 +6,10 @@
\brief For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned.
+ \tparam BinaryPredicate (optional) the type of the value comparing function. The signature should be equivalent to the following: bool pred(const T1& a, const T2& b);
+
+ \param pred (optional) the function that implements comparison of two values.
+
\return Observable that emits those items from the source observable that are distinct from their immediate predecessors.
\sample
@@ -34,10 +38,18 @@ struct distinct_until_changed_invalid : public rxo::operator_base<distinct_until
template<class... AN>
using distinct_until_changed_invalid_t = typename distinct_until_changed_invalid<AN...>::type;
-template<class T>
+template<class T, class BinaryPredicate>
struct distinct_until_changed
{
typedef rxu::decay_t<T> source_value_type;
+ typedef rxu::decay_t<BinaryPredicate> predicate_type;
+
+ predicate_type pred;
+
+ distinct_until_changed(predicate_type p)
+ : pred(std::move(p))
+ {
+ }
template<class Subscriber>
struct distinct_until_changed_observer
@@ -46,15 +58,18 @@ struct distinct_until_changed
typedef source_value_type value_type;
typedef rxu::decay_t<Subscriber> dest_type;
typedef observer<value_type, this_type> observer_type;
+
dest_type dest;
+ predicate_type pred;
mutable rxu::detail::maybe<source_value_type> remembered;
- distinct_until_changed_observer(dest_type d)
- : dest(d)
+ distinct_until_changed_observer(dest_type d, predicate_type pred)
+ : dest(std::move(d))
+ , pred(std::move(pred))
{
}
void on_next(source_value_type v) const {
- if (remembered.empty() || !(v == remembered.get())) {
+ if (remembered.empty() || !pred(v, remembered.get())) {
remembered.reset(v);
dest.on_next(v);
}
@@ -66,15 +81,15 @@ struct distinct_until_changed
dest.on_completed();
}
- static subscriber<value_type, observer_type> make(dest_type d) {
- return make_subscriber<value_type>(d, this_type(d));
+ static subscriber<value_type, observer_type> make(dest_type d, predicate_type p) {
+ return make_subscriber<value_type>(d, this_type(d, std::move(p)));
}
};
template<class Subscriber>
auto operator()(Subscriber dest) const
- -> decltype(distinct_until_changed_observer<Subscriber>::make(std::move(dest))) {
- return distinct_until_changed_observer<Subscriber>::make(std::move(dest));
+ -> decltype(distinct_until_changed_observer<Subscriber>::make(std::move(dest), pred)) {
+ return distinct_until_changed_observer<Subscriber>::make(std::move(dest), pred);
}
};
@@ -97,17 +112,28 @@ struct member_overload<distinct_until_changed_tag>
class SourceValue = rxu::value_type_t<Observable>,
class Enabled = rxu::enable_if_all_true_type_t<
is_observable<Observable>>,
- class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue>>
+ class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue, rxu::equal_to<>>>
static auto member(Observable&& o)
- -> decltype(o.template lift<SourceValue>(DistinctUntilChanged())) {
- return o.template lift<SourceValue>(DistinctUntilChanged());
+ -> decltype(o.template lift<SourceValue>(DistinctUntilChanged(rxu::equal_to<>()))) {
+ return o.template lift<SourceValue>(DistinctUntilChanged(rxu::equal_to<>()));
+ }
+
+ template<class Observable,
+ class BinaryPredicate,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class Enabled = rxu::enable_if_all_true_type_t<
+ is_observable<Observable>>,
+ class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue, BinaryPredicate>>
+ static auto member(Observable&& o, BinaryPredicate&& pred)
+ -> decltype(o.template lift<SourceValue>(DistinctUntilChanged(std::forward<BinaryPredicate>(pred)))) {
+ return o.template lift<SourceValue>(DistinctUntilChanged(std::forward<BinaryPredicate>(pred)));
}
template<class... AN>
static operators::detail::distinct_until_changed_invalid_t<AN...> member(AN...) {
std::terminate();
return {};
- static_assert(sizeof...(AN) == 10000, "distinct_until_changed takes no arguments");
+ static_assert(sizeof...(AN) == 10000, "distinct_until_changed takes (optional BinaryPredicate)");
}
};
diff --git a/Rx/v2/test/operators/distinct_until_changed.cpp b/Rx/v2/test/operators/distinct_until_changed.cpp
index da56204..c27a41a 100644
--- a/Rx/v2/test/operators/distinct_until_changed.cpp
+++ b/Rx/v2/test/operators/distinct_until_changed.cpp
@@ -361,3 +361,57 @@ SCENARIO("distinct_until_changed - custom type", "[distinct_until_changed][opera
}
}
}
+
+SCENARIO("distinct_until_changed - custom predicate", "[distinct_until_changed][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2), //*
+ on.next(215, 3), //*
+ on.next(220, 3),
+ on.next(225, 2), //*
+ on.next(230, 2),
+ on.next(230, 1), //*
+ on.next(240, 2), //*
+ on.completed(250)
+ });
+
+ WHEN("distinct values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .distinct_until_changed([](int x, int y) { return x == y; })
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains distinct items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 2), //*
+ on.next(215, 3), //*
+ on.next(225, 2), //*
+ on.next(230, 1), //*
+ on.next(240, 2), //*
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}