diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-12-09 23:48:45 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2016-12-09 14:30:44 -0800 |
commit | 2ac078e3b0c78785134a195c26e7b55a6758f3c7 (patch) | |
tree | 3cf3abd0726885c47f60311cd7a55a0b7eab9873 /Rx/v2 | |
parent | 0cf6cacc8114b9e503b674bdf21042892f73fb3a (diff) | |
download | RxCpp-2ac078e3b0c78785134a195c26e7b55a6758f3c7.tar.gz |
add an optional BinaryPredicate to distinct_until_changed for element comparison
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp | 50 | ||||
-rw-r--r-- | Rx/v2/test/operators/distinct_until_changed.cpp | 54 |
2 files changed, 92 insertions, 12 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp b/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp index 761418e..7d8ef5a 100644 --- a/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp @@ -6,6 +6,10 @@ \brief For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned. + \tparam BinaryPredicate (optional) the type of the value comparing function. The signature should be equivalent to the following: bool pred(const T1& a, const T2& b); + + \param pred (optional) the function that implements comparison of two values. + \return Observable that emits those items from the source observable that are distinct from their immediate predecessors. \sample @@ -34,10 +38,18 @@ struct distinct_until_changed_invalid : public rxo::operator_base<distinct_until template<class... AN> using distinct_until_changed_invalid_t = typename distinct_until_changed_invalid<AN...>::type; -template<class T> +template<class T, class BinaryPredicate> struct distinct_until_changed { typedef rxu::decay_t<T> source_value_type; + typedef rxu::decay_t<BinaryPredicate> predicate_type; + + predicate_type pred; + + distinct_until_changed(predicate_type p) + : pred(std::move(p)) + { + } template<class Subscriber> struct distinct_until_changed_observer @@ -46,15 +58,18 @@ struct distinct_until_changed typedef source_value_type value_type; typedef rxu::decay_t<Subscriber> dest_type; typedef observer<value_type, this_type> observer_type; + dest_type dest; + predicate_type pred; mutable rxu::detail::maybe<source_value_type> remembered; - distinct_until_changed_observer(dest_type d) - : dest(d) + distinct_until_changed_observer(dest_type d, predicate_type pred) + : dest(std::move(d)) + , pred(std::move(pred)) { } void on_next(source_value_type v) const { - if (remembered.empty() || !(v == remembered.get())) { + if (remembered.empty() || !pred(v, remembered.get())) { remembered.reset(v); dest.on_next(v); } @@ -66,15 +81,15 @@ struct distinct_until_changed dest.on_completed(); } - static subscriber<value_type, observer_type> make(dest_type d) { - return make_subscriber<value_type>(d, this_type(d)); + static subscriber<value_type, observer_type> make(dest_type d, predicate_type p) { + return make_subscriber<value_type>(d, this_type(d, std::move(p))); } }; template<class Subscriber> auto operator()(Subscriber dest) const - -> decltype(distinct_until_changed_observer<Subscriber>::make(std::move(dest))) { - return distinct_until_changed_observer<Subscriber>::make(std::move(dest)); + -> decltype(distinct_until_changed_observer<Subscriber>::make(std::move(dest), pred)) { + return distinct_until_changed_observer<Subscriber>::make(std::move(dest), pred); } }; @@ -97,17 +112,28 @@ struct member_overload<distinct_until_changed_tag> class SourceValue = rxu::value_type_t<Observable>, class Enabled = rxu::enable_if_all_true_type_t< is_observable<Observable>>, - class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue>> + class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue, rxu::equal_to<>>> static auto member(Observable&& o) - -> decltype(o.template lift<SourceValue>(DistinctUntilChanged())) { - return o.template lift<SourceValue>(DistinctUntilChanged()); + -> decltype(o.template lift<SourceValue>(DistinctUntilChanged(rxu::equal_to<>()))) { + return o.template lift<SourceValue>(DistinctUntilChanged(rxu::equal_to<>())); + } + + template<class Observable, + class BinaryPredicate, + class SourceValue = rxu::value_type_t<Observable>, + class Enabled = rxu::enable_if_all_true_type_t< + is_observable<Observable>>, + class DistinctUntilChanged = rxo::detail::distinct_until_changed<SourceValue, BinaryPredicate>> + static auto member(Observable&& o, BinaryPredicate&& pred) + -> decltype(o.template lift<SourceValue>(DistinctUntilChanged(std::forward<BinaryPredicate>(pred)))) { + return o.template lift<SourceValue>(DistinctUntilChanged(std::forward<BinaryPredicate>(pred))); } template<class... AN> static operators::detail::distinct_until_changed_invalid_t<AN...> member(AN...) { std::terminate(); return {}; - static_assert(sizeof...(AN) == 10000, "distinct_until_changed takes no arguments"); + static_assert(sizeof...(AN) == 10000, "distinct_until_changed takes (optional BinaryPredicate)"); } }; diff --git a/Rx/v2/test/operators/distinct_until_changed.cpp b/Rx/v2/test/operators/distinct_until_changed.cpp index da56204..c27a41a 100644 --- a/Rx/v2/test/operators/distinct_until_changed.cpp +++ b/Rx/v2/test/operators/distinct_until_changed.cpp @@ -361,3 +361,57 @@ SCENARIO("distinct_until_changed - custom type", "[distinct_until_changed][opera } } } + +SCENARIO("distinct_until_changed - custom predicate", "[distinct_until_changed][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), //* + on.next(215, 3), //* + on.next(220, 3), + on.next(225, 2), //* + on.next(230, 2), + on.next(230, 1), //* + on.next(240, 2), //* + on.completed(250) + }); + + WHEN("distinct values are taken"){ + + auto res = w.start( + [xs]() { + return xs + .distinct_until_changed([](int x, int y) { return x == y; }) + // forget type to workaround lambda deduction bug on msvc 2013 + .as_dynamic(); + } + ); + + THEN("the output only contains distinct items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(210, 2), //* + on.next(215, 3), //* + on.next(225, 2), //* + on.next(230, 1), //* + on.next(240, 2), //* + on.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} |