summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorMartin Kodovský <martin.kodovsky@gmail.com>2017-12-14 06:02:38 +0100
committerKirk Shoop <kirk.shoop@microsoft.com>2017-12-13 21:02:38 -0800
commitb84db4278e54e722fbbae794f573d1142261e9a3 (patch)
tree42cb893608f8df99da9fbd5366ad89fd5a78c24a /Rx/v2
parent815e92158e3e0647b96d1331de1ecc5badcde3f8 (diff)
downloadRxCpp-b84db4278e54e722fbbae794f573d1142261e9a3.tar.gz
Add: Skip_while operator (#418)
* #378 adding skip_while operator * #378 adding tests + fixing CMakeLists.txt * #378 test completion * #378 fix includes * #378 fix assignment in a condition * #378 fix assignment in a condition 2
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/examples/doxygen/skip_while.cpp17
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-skip_while.hpp131
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp1
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp11
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp7
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/skip_while.cpp434
7 files changed, 602 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/skip_while.cpp b/Rx/v2/examples/doxygen/skip_while.cpp
new file mode 100644
index 0000000..a98ce18
--- /dev/null
+++ b/Rx/v2/examples/doxygen/skip_while.cpp
@@ -0,0 +1,17 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("skip_while sample"){
+ printf("//! [skip_while sample]\n");
+ auto values = rxcpp::observable<>::range(1, 8).
+ skip_while([](int v){
+ return v <= 4;
+ });
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [skip_while sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/operators/rx-skip_while.hpp b/Rx/v2/src/rxcpp/operators/rx-skip_while.hpp
new file mode 100644
index 0000000..fdd06b6
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-skip_while.hpp
@@ -0,0 +1,131 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+/*! \file rx-skip_while.hpp
+
+ \brief Discards the first items fulfilling the predicate from this observable emit them from the new observable that is returned.
+
+ \tparam Predicate the type of the predicate
+
+ \param t the predicate
+
+ \return An observable that discards the first items until condition emitted by the source Observable is not fulfilling the predicate, or all of the items from the source observable if the predicate never returns false
+
+ \sample
+ \snippet skip_while.cpp skip_while sample
+ \snippet output.txt skip_while sample
+*/
+
+#if !defined(RXCPP_OPERATORS_RX_SKIP_WHILE_HPP)
+#define RXCPP_OPERATORS_RX_SKIP_WHILE_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class... AN>
+struct skip_while_invalid_arguments {};
+
+template<class... AN>
+struct skip_while_invalid : public rxo::operator_base<skip_while_invalid_arguments<AN...>> {
+ using type = observable<skip_while_invalid_arguments<AN...>, skip_while_invalid<AN...>>;
+};
+template<class... AN>
+using skip_while_invalid_t = typename skip_while_invalid<AN...>::type;
+
+template<class T, class Predicate>
+struct skip_while
+{
+ typedef rxu::decay_t<T> source_value_type;
+ typedef rxu::decay_t<Predicate> test_type;
+ test_type test;
+
+
+ skip_while(test_type t)
+ : test(std::move(t))
+ {
+ }
+
+ template<class Subscriber>
+ struct skip_while_observer
+ {
+ typedef skip_while_observer<Subscriber> this_type;
+ typedef source_value_type value_type;
+ typedef rxu::decay_t<Subscriber> dest_type;
+ typedef observer<value_type, this_type> observer_type;
+ dest_type dest;
+ test_type test;
+ bool pass;
+
+ skip_while_observer(dest_type d, test_type t)
+ : dest(std::move(d))
+ , test(std::move(t)),
+ pass(false)
+ {
+ }
+ void on_next(source_value_type v) {
+ if(pass || !test(v))
+ {
+ pass = true;
+ dest.on_next(v);
+ }
+ }
+ void on_error(std::exception_ptr e) const {
+ dest.on_error(e);
+ }
+ void on_completed() const {
+ dest.on_completed();
+ }
+
+ static subscriber<value_type, observer_type> make(dest_type d, test_type t) {
+ return make_subscriber<value_type>(d, this_type(d, std::move(t)));
+ }
+ };
+
+ template<class Subscriber>
+ auto operator()(Subscriber dest) const
+ -> decltype(skip_while_observer<Subscriber>::make(std::move(dest), test)) {
+ return skip_while_observer<Subscriber>::make(std::move(dest), test);
+ }
+};
+
+}
+
+/*! @copydoc rx-skip_while.hpp
+*/
+template<class... AN>
+auto skip_while(AN&&... an)
+ -> operator_factory<skip_while_tag, AN...> {
+ return operator_factory<skip_while_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
+ }
+
+}
+
+template<>
+struct member_overload<skip_while_tag>
+{
+ template<class Observable, class Predicate,
+ class SourceValue = rxu::value_type_t<Observable>,
+ class TakeWhile = rxo::detail::skip_while<SourceValue, rxu::decay_t<Predicate>>>
+ static auto member(Observable&& o, Predicate&& p)
+ -> decltype(o.template lift<SourceValue>(TakeWhile(std::forward<Predicate>(p)))) {
+ return o.template lift<SourceValue>(TakeWhile(std::forward<Predicate>(p)));
+ }
+
+ template<class... AN>
+ static operators::detail::skip_while_invalid_t<AN...> member(const AN&...) {
+ std::terminate();
+ return {};
+ static_assert(sizeof...(AN) == 10000, "skip_while takes (Predicate)");
+ }
+};
+
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index aefa190..5104565 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -221,6 +221,7 @@
#include "operators/rx-scan.hpp"
#include "operators/rx-sequence_equal.hpp"
#include "operators/rx-skip.hpp"
+#include "operators/rx-skip_while.hpp"
#include "operators/rx-skip_last.hpp"
#include "operators/rx-skip_until.hpp"
#include "operators/rx-start_with.hpp"
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 3a31240..496db0b 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1361,6 +1361,17 @@ public:
return observable_member(skip_tag{}, *this, std::forward<AN>(an)...);
}
+ /*! @copydoc rx-skip.hpp
+ */
+ template<class... AN>
+ auto skip_while(AN... an) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
+ /// \endcond
+ {
+ return observable_member(skip_while_tag{}, *this, std::forward<AN>(an)...);
+ }
+
/*! @copydoc rx-skip_last.hpp
*/
template<class... AN>
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 15e8b54..9a5481a 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -377,6 +377,13 @@ struct skip_tag {
};
};
+struct skip_while_tag {
+ template<class Included>
+ struct include_header{
+ static_assert(Included::value, "missing include: please #include <rxcpp/operators/rx-skip_while.hpp>");
+ };
+};
+
struct skip_last_tag {
template<class Included>
struct include_header{
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index 1064c0f..dcb998f 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -67,6 +67,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/scan.cpp
${TEST_DIR}/operators/sequence_equal.cpp
${TEST_DIR}/operators/skip.cpp
+ ${TEST_DIR}/operators/skip_while.cpp
${TEST_DIR}/operators/skip_last.cpp
${TEST_DIR}/operators/skip_until.cpp
${TEST_DIR}/operators/start_with.cpp
diff --git a/Rx/v2/test/operators/skip_while.cpp b/Rx/v2/test/operators/skip_while.cpp
new file mode 100644
index 0000000..cc773b1
--- /dev/null
+++ b/Rx/v2/test/operators/skip_while.cpp
@@ -0,0 +1,434 @@
+#include "../test.h"
+#include <rxcpp/operators/rx-skip_while.hpp>
+
+namespace {
+ class not_equal_to {
+ int value;
+ public:
+ not_equal_to(int value) : value(value) { }
+ bool operator()(int i) const {
+ return i != value;
+ }
+ };
+}
+
+SCENARIO("skip_while not equal to 4", "[skip_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4),
+ on.next(240, 5),
+ on.completed(250)
+ });
+
+ WHEN("values before 4 are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .skip_while(not_equal_to(4))
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(230, 4),
+ on.next(240, 5),
+ on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("skip_while, complete after", "[skip_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10),
+ on.completed(690)
+ });
+
+ WHEN("none are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .skip_while(not_equal_to(0))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output contains no items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.completed(690)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 690)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("skip_while, complete before", "[skip_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10),
+ on.completed(690)
+ });
+
+ WHEN("7 values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .skip_while(not_equal_to(72))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10),
+ on.completed(690)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 690)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("skip_while, error after", "[skip_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("skip_while on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10),
+ on.error(690, ex)
+ });
+
+ WHEN("no values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .skip_while(not_equal_to(0))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed and the error"){
+ auto required = rxu::to_vector({
+ on.error(690, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 690)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("skip_while, error before", "[skip_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("skip_while on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.error(500, ex),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10)
+ });
+
+ WHEN("only one value is taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .skip_while(not_equal_to(72))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(460, 72),
+ on.error(500, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 500)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("skip_while, dispose before", "[skip_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 100),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10)
+ });
+
+ WHEN("3 values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .skip_while(not_equal_to(100))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ },
+ 250
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ std::vector<rxcpp::notifications::recorded<std::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > required;
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("skip_while, dispose after", "[skip_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4), //this is skipped due to delayed subscription
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10)
+ });
+
+ WHEN("5 values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .skip_while(not_equal_to(1))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ },
+ 400
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 400)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}