summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorRafaƂ Borowiak <ravirael@gmail.com>2016-12-09 16:33:08 +0100
committerKirk Shoop <kirk.shoop@microsoft.com>2016-12-09 07:33:08 -0800
commit0cf6cacc8114b9e503b674bdf21042892f73fb3a (patch)
treef9532ba6e3f2b2da2701331f3bd895f9e18bec1f /Rx/v2
parent3ba04a7f41a0e3b3136c29477e6ac68368b39a1a (diff)
downloadRxCpp-0cf6cacc8114b9e503b674bdf21042892f73fb3a.tar.gz
Issue #283: take_while operator implementation (#287)
* Implementation of take_while operator and tests * Refactored tests and changed documentation * Removed 'noexcept' specifier from helper class in take_while test * Removed 'const' specifier from not_equal_to helper class in take_while test in order to get rid of MSVC assignment operator warning.
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-take_while.hpp129
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp18
-rw-r--r--Rx/v2/src/rxcpp/rx-operators.hpp1
-rw-r--r--Rx/v2/test/CMakeLists.txt1
-rw-r--r--Rx/v2/test/operators/take_while.cpp549
5 files changed, 698 insertions, 0 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-take_while.hpp b/Rx/v2/src/rxcpp/operators/rx-take_while.hpp
new file mode 100644
index 0000000..8ad3537
--- /dev/null
+++ b/Rx/v2/src/rxcpp/operators/rx-take_while.hpp
@@ -0,0 +1,129 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+#pragma once
+
+#if !defined(RXCPP_OPERATORS_RX_TAKE_WHILE_HPP)
+#define RXCPP_OPERATORS_RX_TAKE_WHILE_HPP
+
+#include "../rx-includes.hpp"
+
+namespace rxcpp {
+
+namespace operators {
+
+namespace detail {
+
+template<class T, class Observable, class Predicate>
+struct take_while : public operator_base<T>
+{
+ typedef rxu::decay_t<Observable> source_type;
+ typedef rxu::decay_t<Predicate> test_type;
+ struct values
+ {
+ values(source_type s, test_type t)
+ : source(std::move(s))
+ , test(std::move(t))
+ {
+ }
+ source_type source;
+ test_type test;
+ };
+ values initial;
+
+ take_while(source_type s, test_type t)
+ : initial(std::move(s), std::move(t))
+ {
+ }
+
+ struct mode
+ {
+ enum type {
+ taking, // capture messages
+ triggered, // ignore messages
+ errored, // error occured
+ stopped // observable completed
+ };
+ };
+
+ template<class Subscriber>
+ void on_subscribe(const Subscriber& s) const {
+
+ typedef Subscriber output_type;
+ struct state_type
+ : public std::enable_shared_from_this<state_type>
+ , public values
+ {
+ state_type(const values& i, const output_type& oarg)
+ : values(i)
+ , mode_value(mode::taking)
+ , out(oarg)
+ {
+ }
+ typename mode::type mode_value;
+ output_type out;
+ };
+ // take a copy of the values for each subscription
+ auto state = std::make_shared<state_type>(initial, s);
+
+ composite_subscription source_lifetime;
+
+ s.add(source_lifetime);
+
+ state->source.subscribe(
+ // split subscription lifetime
+ source_lifetime,
+ // on_next
+ [state, source_lifetime](T t) {
+ if (state->mode_value < mode::triggered) {
+ if (state->test(t)) {
+ state->out.on_next(t);
+ } else {
+ state->mode_value = mode::triggered;
+ // must shutdown source before signaling completion
+ source_lifetime.unsubscribe();
+ state->out.on_completed();
+ }
+ }
+ },
+ // on_error
+ [state](std::exception_ptr e) {
+ state->mode_value = mode::errored;
+ state->out.on_error(e);
+ },
+ // on_completed
+ [state]() {
+ state->mode_value = mode::stopped;
+ state->out.on_completed();
+ }
+ );
+ }
+};
+
+template<class T>
+class take_while_factory
+{
+ typedef rxu::decay_t<T> test_type;
+ test_type test;
+public:
+ take_while_factory(test_type t) : test(std::move(t)) {}
+ template<class Observable>
+ auto operator()(Observable&& source)
+ -> observable<rxu::value_type_t<rxu::decay_t<Observable>>, take_while<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, test_type>> {
+ return observable<rxu::value_type_t<rxu::decay_t<Observable>>, take_while<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, test_type>>(
+ take_while<rxu::value_type_t<rxu::decay_t<Observable>>, Observable, test_type>(std::forward<Observable>(source), test));
+ }
+};
+
+}
+
+template<class T>
+auto take_while(T&& t)
+ -> detail::take_while_factory<T> {
+ return detail::take_while_factory<T>(std::forward<T>(t));
+}
+
+}
+
+}
+
+#endif
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 7d37f45..e3b1d13 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -2940,6 +2940,24 @@ public:
return take_until(rxs::timer(when, cn), cn);
}
+ /*! For the first items fulfilling the predicate from this observable emit them from the new observable that is returned.
+
+ \tparam Predicate the type of the predicate
+
+ \param t the predicate
+
+ \return An observable that emits only the first items emitted by the source Observable fulfilling the predicate, or all of the items from the source observable if the predicate never returns false
+*/
+ template<class Predicate>
+ auto take_while(Predicate t) const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> observable<T, rxo::detail::take_while<T, this_type, Predicate>>
+ /// \endcond
+ {
+ return observable<T, rxo::detail::take_while<T, this_type, Predicate>>(
+ rxo::detail::take_while<T, this_type, Predicate>(*this, t));
+ }
+
/*! Infinitely repeat this observable.
\return An observable that emits the items emitted by the source observable repeatedly and in sequence.
diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp
index 3f920b8..117913b 100644
--- a/Rx/v2/src/rxcpp/rx-operators.hpp
+++ b/Rx/v2/src/rxcpp/rx-operators.hpp
@@ -128,6 +128,7 @@ public:
#include "operators/rx-take.hpp"
#include "operators/rx-take_last.hpp"
#include "operators/rx-take_until.hpp"
+#include "operators/rx-take_while.hpp"
#include "operators/rx-tap.hpp"
#include "operators/rx-time_interval.hpp"
#include "operators/rx-timeout.hpp"
diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt
index 55d1b84..1a94a1b 100644
--- a/Rx/v2/test/CMakeLists.txt
+++ b/Rx/v2/test/CMakeLists.txt
@@ -71,6 +71,7 @@ set(TEST_SOURCES
${TEST_DIR}/operators/take.cpp
${TEST_DIR}/operators/take_last.cpp
${TEST_DIR}/operators/take_until.cpp
+ ${TEST_DIR}/operators/take_while.cpp
${TEST_DIR}/operators/tap.cpp
${TEST_DIR}/operators/time_interval.cpp
${TEST_DIR}/operators/timeout.cpp
diff --git a/Rx/v2/test/operators/take_while.cpp b/Rx/v2/test/operators/take_while.cpp
new file mode 100644
index 0000000..6a23b86
--- /dev/null
+++ b/Rx/v2/test/operators/take_while.cpp
@@ -0,0 +1,549 @@
+#include "../test.h"
+
+namespace {
+ class not_equal_to {
+ int value;
+ public:
+ not_equal_to(int value) : value(value) { }
+ bool operator()(int i) const {
+ return i != value;
+ }
+ };
+}
+
+SCENARIO("take_while not equal to 4", "[take_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 2),
+ on.next(220, 3),
+ on.next(230, 4),
+ on.next(240, 5),
+ on.completed(250)
+ });
+
+ WHEN("values before 4 are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .take_while(not_equal_to(4))
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 2),
+ on.next(220, 3),
+ on.completed(230)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 230)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("take_while, complete after", "[take_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10),
+ on.completed(690)
+ });
+
+ WHEN("all are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .take_while(not_equal_to(0))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10),
+ on.completed(690)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 690)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("take_while, complete before", "[take_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10),
+ on.completed(690)
+ });
+
+ WHEN("10 values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .take_while(not_equal_to(72))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.completed(460)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 460)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("take_while, error after", "[take_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ std::runtime_error ex("take_while on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10),
+ on.error(690, ex)
+ });
+
+ WHEN("all values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .take_while(not_equal_to(0))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed and the error"){
+ auto required = rxu::to_vector({
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10),
+ on.error(690, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 690)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("take_while, error same", "[take_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10),
+ on.error(690, std::runtime_error("error in unsubscribed stream"))
+ });
+
+ WHEN("all but one values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .take_while(not_equal_to(10))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.completed(630)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 630)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("take_while, error before", "[take_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10),
+ on.error(690, std::runtime_error("error in unsubscribed stream"))
+ });
+
+ WHEN("3 values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .take_while(not_equal_to(1))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ }
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.completed(280)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 280)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("take_while, dispose before", "[take_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10)
+ });
+
+ WHEN("3 values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .take_while(not_equal_to(100))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ },
+ 250
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 9),
+ on.next(230, 13)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+
+SCENARIO("take_while, dispose after", "[take_while][operators]"){
+ GIVEN("a source"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(70, 6),
+ on.next(150, 4),
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.next(280, 1),
+ on.next(300, -1),
+ on.next(310, 3),
+ on.next(340, 8),
+ on.next(370, 11),
+ on.next(410, 15),
+ on.next(415, 16),
+ on.next(460, 72),
+ on.next(510, 76),
+ on.next(560, 32),
+ on.next(570, -100),
+ on.next(580, -3),
+ on.next(590, 5),
+ on.next(630, 10)
+ });
+
+ WHEN("3 values are taken"){
+
+ auto res = w.start(
+ [xs]() {
+ return xs
+ .take_while(not_equal_to(1))
+ // forget type to workaround lambda deduction bug on msvc 2013
+ .as_dynamic();
+ },
+ 400
+ );
+
+ THEN("the output only contains items sent while subscribed"){
+ auto required = rxu::to_vector({
+ on.next(210, 9),
+ on.next(230, 13),
+ on.next(270, 7),
+ on.completed(280)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was 1 subscription/unsubscription to the source"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 280)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+
+ }
+ }
+}
+