diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-03-04 17:44:22 +0300 |
---|---|---|
committer | Grigoriy Chudnov <g.chudnov@gmail.com> | 2016-03-04 17:44:22 +0300 |
commit | dccee6d3525fa2f5220a3e6fadd466888f37bbf0 (patch) | |
tree | 9460eab358aca367a447e80c7bf31998434789c3 /Rx/v2 | |
parent | 8433dfe942088a572dba79518ba36427921d5b9f (diff) | |
download | RxCpp-dccee6d3525fa2f5220a3e6fadd466888f37bbf0.tar.gz |
add ignore_elements operator
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/examples/doxygen/ignore_elements.cpp | 15 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp | 79 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 17 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-operators.hpp | 3 | ||||
-rw-r--r-- | Rx/v2/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | Rx/v2/test/operators/ignore_elements.cpp | 160 |
6 files changed, 274 insertions, 1 deletions
diff --git a/Rx/v2/examples/doxygen/ignore_elements.cpp b/Rx/v2/examples/doxygen/ignore_elements.cpp new file mode 100644 index 0000000..86fc2df --- /dev/null +++ b/Rx/v2/examples/doxygen/ignore_elements.cpp @@ -0,0 +1,15 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("ignore_elements sample"){ + printf("//! [ignore_elements sample]\n"); + auto values = rxcpp::observable<>::from(1, 2, 3, 4, 5).ignore_elements(); + values. + subscribe( + [](int v) { printf("OnNext: %d\n", v); }, + []() { printf("OnCompleted\n"); }); + printf("//! [ignore_elements sample]\n"); +} + diff --git a/Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp b/Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp new file mode 100644 index 0000000..f92ac1f --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp @@ -0,0 +1,79 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +#if !defined(RXCPP_OPERATORS_RX_IGNORE_ELEMENTS_HPP) +#define RXCPP_OPERATORS_RX_IGNORE_ELEMENTS_HPP + +#include "../rx-includes.hpp" + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template<class T> +struct ignore_elements { + typedef rxu::decay_t<T> source_value_type; + + template<class Subscriber> + struct ignore_elements_observer + { + typedef ignore_elements_observer<Subscriber> this_type; + typedef source_value_type value_type; + typedef rxu::decay_t<Subscriber> dest_type; + typedef observer<value_type, this_type> observer_type; + dest_type dest; + + ignore_elements_observer(dest_type d) + : dest(d) + { + } + + void on_next(source_value_type) const { + // no-op; ignore element + } + + void on_error(std::exception_ptr e) const { + dest.on_error(e); + } + + void on_completed() const { + dest.on_completed(); + } + + static subscriber<value_type, observer<value_type, this_type>> make(dest_type d) { + return make_subscriber<value_type>(d, this_type(d)); + } + }; + + template<class Subscriber> + auto operator()(Subscriber dest) const + -> decltype(ignore_elements_observer<Subscriber>::make(std::move(dest))) { + return ignore_elements_observer<Subscriber>::make(std::move(dest)); + } +}; + +class ignore_elements_factory +{ + template<class Observable> + auto operator()(Observable&& source) + -> decltype(source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(ignore_elements<rxu::value_type_t<rxu::decay_t<Observable>>>())) { + return source.template lift<rxu::value_type_t<rxu::decay_t<Observable>>>(ignore_elements<rxu::value_type_t<rxu::decay_t<Observable>>>()); + } +}; + +} + + +inline auto ignore_elements() + -> detail::ignore_elements_factory { + return detail::ignore_elements_factory(); +} + +} + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index fcbc377..68ff550 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -2034,6 +2034,23 @@ public: return lift<typename rxo::detail::group_by_traits<T, this_type, KeySelector, MarbleSelector, rxu::less>::grouped_observable_type>(rxo::detail::group_by<T, this_type, KeySelector, MarbleSelector, rxu::less>(std::move(ks), std::move(ms), rxu::less())); } + /*! Do not emit any items from the source Observable, but allow termination notification (either onError or onCompleted) to pass through unchanged. + + \return Observable that emits termination notification from the source observable. + + \sample + \snippet ignore_elements.cpp ignore_elements sample + \snippet output.txt ignore_elements sample + */ + auto ignore_elements() const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(EXPLICIT_THIS lift<T>(rxo::detail::ignore_elements<T>())) + /// \endcond + { + return lift<T>(rxo::detail::ignore_elements<T>()); + } + + /// \cond SHOW_SERVICE_MEMBERS /// multicast -> /// allows connections to the source to be independent of subscriptions diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index 90b3e29..b6b5518 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -44,11 +44,12 @@ namespace rxo=operators; #include "operators/rx-connect_forever.hpp" #include "operators/rx-distinct.hpp" #include "operators/rx-distinct_until_changed.hpp" -#include "rxcpp/operators/rx-element_at.hpp" +#include "operators/rx-element_at.hpp" #include "operators/rx-filter.hpp" #include "operators/rx-finally.hpp" #include "operators/rx-flat_map.hpp" #include "operators/rx-group_by.hpp" +#include "operators/rx-ignore_elements.hpp" #include "operators/rx-lift.hpp" #include "operators/rx-map.hpp" #include "operators/rx-merge.hpp" diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index 57fe3e0..39558e4 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -39,6 +39,7 @@ set(TEST_SOURCES ${TEST_DIR}/operators/filter.cpp ${TEST_DIR}/operators/flat_map.cpp ${TEST_DIR}/operators/group_by.cpp + ${TEST_DIR}/operators/ignore_elements.cpp ${TEST_DIR}/operators/lift.cpp ${TEST_DIR}/operators/map.cpp ${TEST_DIR}/operators/merge.cpp diff --git a/Rx/v2/test/operators/ignore_elements.cpp b/Rx/v2/test/operators/ignore_elements.cpp new file mode 100644 index 0000000..7e495e9 --- /dev/null +++ b/Rx/v2/test/operators/ignore_elements.cpp @@ -0,0 +1,160 @@ +#include "../test.h" + + +SCENARIO("ignore_elements - never", "[ignore_elements][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1) + }); + + WHEN("ignore_elements is applied"){ + + auto res = w.start( + [xs]() { + return xs.ignore_elements(); + } + ); + + THEN("the output is empty"){ + auto required = std::vector<rxsc::test::messages<int>::recorded_type>(); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 1000) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("ignore_elements - empty", "[ignore_elements][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.completed(250) + }); + + WHEN("ignore_elements is applied"){ + + auto res = w.start( + [xs]() { + return xs.ignore_elements(); + } + ); + + THEN("the output contains the completion message"){ + auto required = rxu::to_vector({ + on.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("ignore_elements - throw", "[ignore_elements][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + std::runtime_error ex("ignore_elements on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.error(250, ex) + }); + + WHEN("ignore_elements is applied"){ + + auto res = w.start( + [xs]() { + return xs.ignore_elements(); + } + ); + + THEN("the output contains an error"){ + auto required = rxu::to_vector({ + on.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("ignore_elements - items", "[ignore_elements][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(220, 3), + on.next(230, 4), + on.next(240, 5), + on.completed(250) + }); + + WHEN("ignore_elements is applied"){ + + auto res = w.start( + [xs]() { + return xs.ignore_elements(); + } + ); + + THEN("the output contains the completion message"){ + auto required = rxu::to_vector({ + on.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} |