summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Rx/v2/examples/doxygen/distinct.cpp15
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-distinct.hpp81
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp16
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/distinct.cpp297
-rw-r--r--projects/CMake/CMakeLists.txt1
-rw-r--r--projects/doxygen/CMakeLists.txt1
9 files changed, 414 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/distinct.cpp b/Rx/v2/examples/doxygen/distinct.cpp
new file mode 100644
index 0000000..20443fb
--- /dev/null
+++ b/Rx/v2/examples/doxygen/distinct.cpp
@@ -0,0 +1,15 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("distinct sample"){
+ printf("//! [distinct sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2, 2, 3, 3, 3, 4, 5, 5).distinct();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [distinct sample]\n");
+}
+
diff --git a/Rx/v2/src/rxcpp/operators/rx-distinct.hpp b/Rx/v2/src/rxcpp/operators/rx-distinct.hpp
new file mode 100644
index 0000000..deb93e5
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-distinct.hpp
@@ -0,0 +1,81 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_DISTINCT_HPP)
+#define RXCPP_OPERATORS_RX_DISTINCT_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T>
+struct distinct
+{
+ typedef rxu::decay_t<T> source_value_type;
+
+ template<class Subscriber>
+ struct distinct_observer
+ {
+ typedef distinct_observer<Subscriber> this_type;
+ typedef source_value_type value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<value_type, this_type> observer_type;
+ dest_type dest;
+ mutable std::unordered_set<source_value_type> remembered;
+
+ distinct_observer(dest_type d)
+ : dest(d)
+ {
+ }
+ void on_next(source_value_type v) const {
+ if (remembered.empty() || remembered.count(v) == 0) {
+ remembered.insert(v);
+ dest.on_next(v);
+ }
+ }
+ void on_error(std::exception_ptr e) const {
+ dest.on_error(e);
+ }
+ void on_completed() const {
+ dest.on_completed();
+ }
+
+ static subscriber<value_type, observer<value_type, this_type>> make(dest_type d) {
+ return make_subscriber<value_type>(d, this_type(d));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(distinct_observer<Subscriber>::make(std::move(dest))) {
+ return distinct_observer<Subscriber>::make(std::move(dest));
+ }
+};
+
+class distinct_factory
+{
+public:
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(distinct<rxu::decay_t<Observable>>::value_type)) {
+ return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(distinct<rxu::decay_t<Observable>>::value_type);
+ }
+};
+
+}
+
+inline auto distinct()
+-> detail::distinct_factory {
+ return detail::distinct_factory();
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index c352a84..85a1358 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -130,6 +130,7 @@
#include <initializer_list>
#include <typeinfo>
#include <tuple>
+#include <unordered_set>
#if defined(RXCPP_ON_IOS) || defined(RXCPP_ON_ANDROID)
#include <pthread.h>
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 2bb5b14..802b637 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -838,6 +838,22 @@ public:
return lift<rxu::value_type_t<rxo::detail::map<T, Selector>>>(rxo::detail::map<T, Selector>(std::move(s)));
}
+ /*! For each item from this observable, filter out repeated values and emit only items that have not already been emitted.
+
+ \return Observable that emits those items from the source observable that are distinct.
+
+ \sample
+ \snippet distinct.cpp distinct sample
+ \snippet output.txt distinct sample
+ */
+ auto distinct() const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::distinct<T>()))
+ /// \endcond
+ {
+ return lift<T>(rxo::detail::distinct<T>());
+ }
+
/*! For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned.
\return Observable that emits those items from the source observable that are distinct from their immediate predecessors.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 78f1fb7..914b94a 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -42,6 +42,7 @@ namespace rxo=operators;
#include "operators/rx-concat.hpp"
#include "operators/rx-concat_map.hpp"
#include "operators/rx-connect_forever.hpp"
+#include "operators/rx-distinct.hpp"
#include "operators/rx-distinct_until_changed.hpp"
#include "operators/rx-filter.hpp"
#include "operators/rx-finally.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index e1a17b3..3f08d74 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -33,6 +33,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/combine_latest.cpp
${TEST_DIR}/operators/concat.cpp
${TEST_DIR}/operators/concat_map.cpp
+ ${TEST_DIR}/operators/distinct.cpp
${TEST_DIR}/operators/distinct_until_changed.cpp
${TEST_DIR}/operators/filter.cpp
${TEST_DIR}/operators/flat_map.cpp
diff --git a/Rx/v2/test/operators/distinct.cpp b/Rx/v2/test/operators/distinct.cpp
new file mode 100644
index 0000000..e53e069
--- /dev/null
+++ b/Rx/v2/test/operators/distinct.cpp
@@ -0,0 +1,297 @@
+#include "../test.h"
+
+SCENARIO("distinct - never", "[distinct][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("distinct values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.distinct();
+ }
+ );
+
+ THEN("the output is empty"){
+ auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("distinct - empty", "[distinct][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("distinct values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.distinct();
+ }
+ );
+
+ THEN("the output only contains complete message"){
+ auto required = rxu::to_vector({
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("distinct - return", "[distinct][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.completed(250)
+ });
+
+ WHEN("distinct values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.distinct();
+ }
+ );
+
+ THEN("the output only contains distinct items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("distinct - throw", "[distinct][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("distinct on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("distinct values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.distinct();
+ }
+ );
+
+ THEN("the output only contains only error"){
+ auto required = rxu::to_vector({
+ on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("distinct - all changes", "[distinct][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4),
+ on.next(240, 5),
+ on.completed(250)
+ });
+
+ WHEN("distinct values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.distinct();
+ }
+ );
+
+ THEN("the output only contains distinct items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4),
+ on.next(240, 5),
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("distinct - all same", "[distinct][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 2),
+ on.next(230, 2),
+ on.next(240, 2),
+ on.completed(250)
+ });
+
+ WHEN("distinct values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.distinct();
+ }
+ );
+
+ THEN("the output only contains distinct items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("distinct - some changes", "[distinct][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2), //*
+ on.next(215, 3), //*
+ on.next(220, 3),
+ on.next(225, 2),
+ on.next(230, 2),
+ on.next(230, 1), //*
+ on.next(240, 2),
+ on.completed(250)
+ });
+
+ WHEN("distinct values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.distinct();
+ }
+ );
+
+ THEN("the output only contains distinct items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 2), //*
+ on.next(215, 3), //*
+ on.next(230, 1), //*
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
diff --git a/projects/CMake/CMakeLists.txt b/projects/CMake/CMakeLists.txt
index 8d1fe07..e6f3d68 100644
--- a/projects/CMake/CMakeLists.txt
+++ b/projects/CMake/CMakeLists.txt
@@ -34,6 +34,7 @@ set(RX_SOURCES
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-concat.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-connect_forever.hpp
+ ${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-distinct.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-filter.hpp
${RXCPP_DIR}/Rx/v2/src/rxcpp/operators/rx-finally.hpp
diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt
index 46f469e..a228e2b 100644
--- a/projects/doxygen/CMakeLists.txt
+++ b/projects/doxygen/CMakeLists.txt
@@ -51,6 +51,7 @@ if(DOXYGEN_FOUND)
${DOXY_EXAMPLES_SRC_DIR}/concat_map.cpp
${DOXY_EXAMPLES_SRC_DIR}/create.cpp
${DOXY_EXAMPLES_SRC_DIR}/defer.cpp
+ ${DOXY_EXAMPLES_SRC_DIR}/distinct.cpp
${DOXY_EXAMPLES_SRC_DIR}/distinct_until_changed.cpp
${DOXY_EXAMPLES_SRC_DIR}/empty.cpp
${DOXY_EXAMPLES_SRC_DIR}/error.cpp