summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2016-03-04 17:44:22 +0300
committerGrigoriy Chudnov <g.chudnov@gmail.com>2016-03-04 17:44:22 +0300
commitdccee6d3525fa2f5220a3e6fadd466888f37bbf0 (patch)
tree9460eab358aca367a447e80c7bf31998434789c3 /Rx/v2
parent8433dfe942088a572dba79518ba36427921d5b9f (diff)
downloadRxCpp-dccee6d3525fa2f5220a3e6fadd466888f37bbf0.tar.gz
add ignore_elements operator
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/examples/doxygen/ignore_elements.cpp15
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp79
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp17
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp3
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/ignore_elements.cpp160
6 files changed, 274 insertions, 1 deletions
diff --git a/Rx/v2/examples/doxygen/ignore_elements.cpp b/Rx/v2/examples/doxygen/ignore_elements.cpp
new file mode 100644
index 0000000..86fc2df
--- /dev/null
+++ b/Rx/v2/examples/doxygen/ignore_elements.cpp
@@ -0,0 +1,15 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("ignore_elements sample"){
+ printf("//! [ignore_elements sample]\n");
+ auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).ignore_elements();
+ values.
+ subscribe(
+ [](int v) { printf("OnNext: %d\n", v); },
+ []() { printf("OnCompleted\n"); });
+ printf("//! [ignore_elements sample]\n");
+}
+
diff --git a/Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp b/Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp
new file mode 100644
index 0000000..f92ac1f
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp
@@ -0,0 +1,79 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_IGNORE_ELEMENTS_HPP)
+#define RXCPP_OPERATORS_RX_IGNORE_ELEMENTS_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T>
+struct ignore_elements {
+ typedef rxu::decay_t<T> source_value_type;
+
+ template<class Subscriber>
+ struct ignore_elements_observer
+ {
+ typedef ignore_elements_observer<Subscriber> this_type;
+ typedef source_value_type value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<value_type, this_type> observer_type;
+ dest_type dest;
+
+ ignore_elements_observer(dest_type d)
+ : dest(d)
+ {
+ }
+
+ void on_next(source_value_type) const {
+ // no-op; ignore element
+ }
+
+ void on_error(std::exception_ptr e) const {
+ dest.on_error(e);
+ }
+
+ void on_completed() const {
+ dest.on_completed();
+ }
+
+ static subscriber<value_type, observer<value_type, this_type>> make(dest_type d) {
+ return make_subscriber<value_type>(d, this_type(d));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(ignore_elements_observer<Subscriber>::make(std::move(dest))) {
+ return ignore_elements_observer<Subscriber>::make(std::move(dest));
+ }
+};
+
+class ignore_elements_factory
+{
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(ignore_elements<rxu::value_type_t<rxu::decay_t<Observable>>>())) {
+ return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(ignore_elements<rxu::value_type_t<rxu::decay_t<Observable>>>());
+ }
+};
+
+}
+
+
+inline auto ignore_elements()
+ -> detail::ignore_elements_factory {
+ return detail::ignore_elements_factory();
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index fcbc377..68ff550 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -2034,6 +2034,23 @@ public:
return lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, rxu::less>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less()));
}
+ /*! Do not emit any items from the source Observable, but allow termination notification (either onError or onCompleted) to pass through unchanged.
+
+ \return Observable that emits termination notification from the source observable.
+
+ \sample
+ \snippet ignore_elements.cpp ignore_elements sample
+ \snippet output.txt ignore_elements sample
+ */
+ auto ignore_elements() const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::ignore_elements<T>()))
+ /// \endcond
+ {
+ return lift<T>(rxo::detail::ignore_elements<T>());
+ }
+
+
/// \cond SHOW_SERVICE_MEMBERS
/// multicast ->
/// allows connections to the source to be independent of subscriptions
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 90b3e29..b6b5518 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -44,11 +44,12 @@ namespace rxo=operators;
#include "operators/rx-connect_forever.hpp"
#include "operators/rx-distinct.hpp"
#include "operators/rx-distinct_until_changed.hpp"
-#include "rxcpp/operators/rx-element_at.hpp"
+#include "operators/rx-element_at.hpp"
#include "operators/rx-filter.hpp"
#include "operators/rx-finally.hpp"
#include "operators/rx-flat_map.hpp"
#include "operators/rx-group_by.hpp"
+#include "operators/rx-ignore_elements.hpp"
#include "operators/rx-lift.hpp"
#include "operators/rx-map.hpp"
#include "operators/rx-merge.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index 57fe3e0..39558e4 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -39,6 +39,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/filter.cpp
${TEST_DIR}/operators/flat_map.cpp
${TEST_DIR}/operators/group_by.cpp
+ ${TEST_DIR}/operators/ignore_elements.cpp
${TEST_DIR}/operators/lift.cpp
${TEST_DIR}/operators/map.cpp
${TEST_DIR}/operators/merge.cpp
diff --git a/Rx/v2/test/operators/ignore_elements.cpp b/Rx/v2/test/operators/ignore_elements.cpp
new file mode 100644
index 0000000..7e495e9
--- /dev/null
+++ b/Rx/v2/test/operators/ignore_elements.cpp
@@ -0,0 +1,160 @@
+#include "../test.h"
+
+
+SCENARIO("ignore_elements - never", "[ignore_elements][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1)
+ });
+
+ WHEN("ignore_elements is applied"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.ignore_elements();
+ }
+ );
+
+ THEN("the output is empty"){
+ auto required = std::vector<rxsc::test::messages<int>::recorded_type>();
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 1000)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("ignore_elements - empty", "[ignore_elements][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("ignore_elements is applied"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.ignore_elements();
+ }
+ );
+
+ THEN("the output contains the completion message"){
+ auto required = rxu::to_vector({
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("ignore_elements - throw", "[ignore_elements][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("ignore_elements on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("ignore_elements is applied"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.ignore_elements();
+ }
+ );
+
+ THEN("the output contains an error"){
+ auto required = rxu::to_vector({
+ on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("ignore_elements - items", "[ignore_elements][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4),
+ on.next(240, 5),
+ on.completed(250)
+ });
+
+ WHEN("ignore_elements is applied"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs.ignore_elements();
+ }
+ );
+
+ THEN("the output contains the completion message"){
+ auto required = rxu::to_vector({
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}