summaryrefslogtreecommitdiff
path: root/Rx/v2
diff options
context:
space:
mode:
authorGrigoriy Chudnov <g.chudnov@gmail.com>2015-12-16 15:14:54 +0300
committerKirk Shoop <kirk.shoop@microsoft.com>2015-12-17 01:28:34 -0800
commit412b64138e7e62bfe50ed807a0835ed5dd32de98 (patch)
treef1c2fc8fb5fa87a49d4c5e18ed052e4eb7a484b5 /Rx/v2
parentbd39892795a4b2d5f02373afd650394dd3280094 (diff)
downloadRxCpp-412b64138e7e62bfe50ed807a0835ed5dd32de98.tar.gz
add min and max operators
Diffstat (limited to 'Rx/v2')
-rw-r--r--Rx/v2/examples/doxygen/blocking_observable.cpp68
-rw-r--r--Rx/v2/examples/doxygen/math.cpp88
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-reduce.hpp40
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp92
-rw-r--r--Rx/v2/test/operators/reduce.cpp293
5 files changed, 581 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/blocking_observable.cpp b/Rx/v2/examples/doxygen/blocking_observable.cpp
index 1e7dd56..eb9633f 100644
--- a/Rx/v2/examples/doxygen/blocking_observable.cpp
+++ b/Rx/v2/examples/doxygen/blocking_observable.cpp
@@ -159,3 +159,71 @@ SCENARIO("blocking average error sample"){
}
printf("//! [blocking average error sample]\n");
}
+
+SCENARIO("blocking max sample"){
+ printf("//! [blocking max sample]\n");
+ auto values = rxcpp::observable<>::range(1, 4).as_blocking();
+ auto max = values.max();
+ printf("max = %d\n", max);
+ printf("//! [blocking max sample]\n");
+}
+
+SCENARIO("blocking max empty sample"){
+ printf("//! [blocking max empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().as_blocking();
+ try {
+ auto max = values.max();
+ printf("max = %d\n", max);
+ } catch (const rxcpp::empty_error& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
+ printf("//! [blocking max empty sample]\n");
+}
+
+SCENARIO("blocking max error sample"){
+ printf("//! [blocking max error sample]\n");
+ auto values = rxcpp::observable<>::range(1, 4).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ as_blocking();
+ try {
+ auto max = values.max();
+ printf("max = %d\n", max);
+ } catch (const std::exception& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
+ printf("//! [blocking max error sample]\n");
+}
+
+SCENARIO("blocking min sample"){
+ printf("//! [blocking min sample]\n");
+ auto values = rxcpp::observable<>::range(1, 4).as_blocking();
+ auto min = values.min();
+ printf("min = %d\n", min);
+ printf("//! [blocking min sample]\n");
+}
+
+SCENARIO("blocking min empty sample"){
+ printf("//! [blocking min empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().as_blocking();
+ try {
+ auto min = values.min();
+ printf("min = %d\n", min);
+ } catch (const rxcpp::empty_error& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
+ printf("//! [blocking min empty sample]\n");
+}
+
+SCENARIO("blocking min error sample"){
+ printf("//! [blocking min error sample]\n");
+ auto values = rxcpp::observable<>::range(1, 4).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ as_blocking();
+ try {
+ auto min = values.min();
+ printf("min = %d\n", min);
+ } catch (const std::exception& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
+ printf("//! [blocking min error sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/math.cpp b/Rx/v2/examples/doxygen/math.cpp
index 2338bfa..376cf97 100644
--- a/Rx/v2/examples/doxygen/math.cpp
+++ b/Rx/v2/examples/doxygen/math.cpp
@@ -170,3 +170,91 @@ SCENARIO("average error sample"){
[](){printf("OnCompleted\n");});
printf("//! [average error sample]\n");
}
+
+SCENARIO("max sample"){
+ printf("//! [max sample]\n");
+ auto values = rxcpp::observable<>::range(1, 4).max();
+ values.
+ subscribe(
+ [](double v){printf("OnNext: %lf\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [max sample]\n");
+}
+
+SCENARIO("max empty sample"){
+ printf("//! [max empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().max();
+ values.
+ subscribe(
+ [](double v){printf("OnNext: %lf\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const rxcpp::empty_error& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [max empty sample]\n");
+}
+
+SCENARIO("max error sample"){
+ printf("//! [max error sample]\n");
+ auto values = rxcpp::observable<>::range(1, 4).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ max();
+ values.
+ subscribe(
+ [](double v){printf("OnNext: %lf\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::runtime_error& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [max error sample]\n");
+}
+
+SCENARIO("min sample"){
+ printf("//! [min sample]\n");
+ auto values = rxcpp::observable<>::range(1, 4).min();
+ values.
+ subscribe(
+ [](double v){printf("OnNext: %lf\n", v);},
+ [](){printf("OnCompleted\n");});
+ printf("//! [min sample]\n");
+}
+
+SCENARIO("min empty sample"){
+ printf("//! [min empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().min();
+ values.
+ subscribe(
+ [](double v){printf("OnNext: %lf\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const rxcpp::empty_error& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [min empty sample]\n");
+}
+
+SCENARIO("min error sample"){
+ printf("//! [min error sample]\n");
+ auto values = rxcpp::observable<>::range(1, 4).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ min();
+ values.
+ subscribe(
+ [](double v){printf("OnNext: %lf\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::runtime_error& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [min error sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
index 588266b..144c225 100644
--- a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
@@ -267,6 +267,46 @@ struct sum {
}
};
+template<class T>
+struct max {
+ typedef rxu::maybe<T> seed_type;
+ seed_type seed() {
+ return seed_type();
+ }
+ seed_type operator()(seed_type a, T v) {
+ if (a.empty())
+ a.reset(v);
+ else
+ *a = (v < *a ? *a : v);
+ return a;
+ }
+ T operator()(seed_type a) {
+ if (a.empty())
+ throw rxcpp::empty_error("max() requires a stream with at least one value");
+ return *a;
+ }
+};
+
+template<class T>
+struct min {
+ typedef rxu::maybe<T> seed_type;
+ seed_type seed() {
+ return seed_type();
+ }
+ seed_type operator()(seed_type a, T v) {
+ if (a.empty())
+ a.reset(v);
+ else
+ *a = (*a < v ? *a : v);
+ return a;
+ }
+ T operator()(seed_type a) {
+ if (a.empty())
+ throw rxcpp::empty_error("min() requires a stream with at least one value");
+ return *a;
+ }
+};
+
}
template<class Seed, class Accumulator, class ResultSelector>
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 30b1802..2bb5b14 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -422,6 +422,48 @@ public:
double average() const {
return source.average().as_blocking().last();
}
+
+ /*! Return the max of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
+
+ \return The max of all items emitted by this blocking_observable.
+
+ \sample
+ When the source observable emits at least one item:
+ \snippet blocking_observable.cpp blocking max sample
+ \snippet output.txt blocking max sample
+
+ When the source observable is empty:
+ \snippet blocking_observable.cpp blocking max empty sample
+ \snippet output.txt blocking max empty sample
+
+ When the source observable calls on_error:
+ \snippet blocking_observable.cpp blocking max error sample
+ \snippet output.txt blocking max error sample
+*/
+ T max() const {
+ return source.max().as_blocking().last();
+ }
+
+ /*! Return the min of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
+
+ \return The min of all items emitted by this blocking_observable.
+
+ \sample
+ When the source observable emits at least one item:
+ \snippet blocking_observable.cpp blocking min sample
+ \snippet output.txt blocking min sample
+
+ When the source observable is empty:
+ \snippet blocking_observable.cpp blocking min empty sample
+ \snippet output.txt blocking min empty sample
+
+ When the source observable calls on_error:
+ \snippet blocking_observable.cpp blocking min error sample
+ \snippet output.txt blocking min error sample
+*/
+ T min() const {
+ return source.min().as_blocking().last();
+ }
};
template<>
@@ -2268,6 +2310,8 @@ public:
- rxcpp::observable::count
- rxcpp::observable::sum
- rxcpp::observable::average
+ - rxcpp::observable::min
+ - rxcpp::observable::max
\sample
Geometric mean of source values:
@@ -2406,6 +2450,54 @@ public:
return defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::make(source_operator, rxo::detail::average<T>(), rxo::detail::average<T>(), rxo::detail::average<T>().seed());
}
+ /*! For each item from this observable reduce it by taking the max value of the previous items.
+
+ \return An observable that emits a single item: the max of elements emitted by the source observable.
+
+ \sample
+ \snippet math.cpp max sample
+ \snippet output.txt max sample
+
+ When the source observable completes without emitting any items:
+ \snippet math.cpp max empty sample
+ \snippet output.txt max empty sample
+
+ When the source observable calls on_error:
+ \snippet math.cpp max error sample
+ \snippet output.txt max error sample
+ */
+ auto max() const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::max, T>, rxu::defer_type<rxo::detail::max, T>, rxu::defer_type<rxo::detail::max, T>>::observable_type
+ /// \endcond
+ {
+ return defer_reduce<rxu::defer_seed_type<rxo::detail::max, T>, rxu::defer_type<rxo::detail::max, T>, rxu::defer_type<rxo::detail::max, T>>::make(source_operator, rxo::detail::max<T>(), rxo::detail::max<T>(), rxo::detail::max<T>().seed());
+ }
+
+ /*! For each item from this observable reduce it by taking the min value of the previous items.
+
+ \return An observable that emits a single item: the min of elements emitted by the source observable.
+
+ \sample
+ \snippet math.cpp min sample
+ \snippet output.txt min sample
+
+ When the source observable completes without emitting any items:
+ \snippet math.cpp min empty sample
+ \snippet output.txt min empty sample
+
+ When the source observable calls on_error:
+ \snippet math.cpp min error sample
+ \snippet output.txt min error sample
+ */
+ auto min() const
+ /// \cond SHOW_SERVICE_MEMBERS
+ -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::min, T>, rxu::defer_type<rxo::detail::min, T>, rxu::defer_type<rxo::detail::min, T>>::observable_type
+ /// \endcond
+ {
+ return defer_reduce<rxu::defer_seed_type<rxo::detail::min, T>, rxu::defer_type<rxo::detail::min, T>, rxu::defer_type<rxo::detail::min, T>>::make(source_operator, rxo::detail::min<T>(), rxo::detail::min<T>(), rxo::detail::min<T>().seed());
+ }
+
/*! For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned.
\tparam Seed the type of the initial value for the accumulator
diff --git a/Rx/v2/test/operators/reduce.cpp b/Rx/v2/test/operators/reduce.cpp
index e0f8e5e..b91d4b9 100644
--- a/Rx/v2/test/operators/reduce.cpp
+++ b/Rx/v2/test/operators/reduce.cpp
@@ -99,3 +99,296 @@ SCENARIO("average some data", "[reduce][average][operators]"){
}
}
}
+
+SCENARIO("sum some data", "[reduce][sum][operators]"){
+ GIVEN("a test hot observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<int> d_on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 3),
+ on.next(220, 4),
+ on.next(230, 2),
+ on.completed(250)
+ });
+
+ WHEN("sum is calculated"){
+
+ auto res = w.start(
+ [&]() {
+ return xs.sum();
+ }
+ );
+
+ THEN("the output contains the sum of source values"){
+ auto required = rxu::to_vector({
+ d_on.next(250, 9),
+ d_on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("max", "[reduce][max][operators]"){
+ GIVEN("a test hot observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<int> d_on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 3),
+ on.next(220, 4),
+ on.next(230, 2),
+ on.completed(250)
+ });
+
+ WHEN("max is calculated"){
+
+ auto res = w.start(
+ [&]() {
+ return xs.max();
+ }
+ );
+
+ THEN("the output contains the max of source values"){
+ auto required = rxu::to_vector({
+ d_on.next(250, 4),
+ d_on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("max, empty", "[reduce][max][operators]"){
+ GIVEN("a test hot observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<int> d_on;
+
+ std::runtime_error ex("max on_error");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("max is calculated"){
+
+ auto res = w.start(
+ [&]() {
+ return xs.max();
+ }
+ );
+
+ THEN("the output contains only error message"){
+ auto required = rxu::to_vector({
+ d_on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("max, error", "[reduce][max][operators]"){
+ GIVEN("a test hot observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<int> d_on;
+
+ std::runtime_error ex("max on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("max is calculated"){
+
+ auto res = w.start(
+ [&]() {
+ return xs.max();
+ }
+ );
+
+ THEN("the output contains only error message"){
+ auto required = rxu::to_vector({
+ d_on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("min", "[reduce][min][operators]"){
+ GIVEN("a test hot observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<int> d_on;
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.next(210, 3),
+ on.next(220, 4),
+ on.next(230, 2),
+ on.completed(250)
+ });
+
+ WHEN("min is calculated"){
+
+ auto res = w.start(
+ [&]() {
+ return xs.min();
+ }
+ );
+
+ THEN("the output contains the min of source values"){
+ auto required = rxu::to_vector({
+ d_on.next(250, 2),
+ d_on.completed(250)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("min, empty", "[reduce][min][operators]"){
+ GIVEN("a test hot observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<int> d_on;
+
+ std::runtime_error ex("min on_error");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.completed(250)
+ });
+
+ WHEN("min is calculated"){
+
+ auto res = w.start(
+ [&]() {
+ return xs.min();
+ }
+ );
+
+ THEN("the output contains only error message"){
+ auto required = rxu::to_vector({
+ d_on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}
+
+SCENARIO("min, error", "[reduce][min][operators]"){
+ GIVEN("a test hot observable of ints"){
+ auto sc = rxsc::make_test();
+ auto w = sc.create_worker();
+ const rxsc::test::messages<int> on;
+ const rxsc::test::messages<int> d_on;
+
+ std::runtime_error ex("min on_error from source");
+
+ auto xs = sc.make_hot_observable({
+ on.next(150, 1),
+ on.error(250, ex)
+ });
+
+ WHEN("min is calculated"){
+
+ auto res = w.start(
+ [&]() {
+ return xs.min();
+ }
+ );
+
+ THEN("the output contains only error message"){
+ auto required = rxu::to_vector({
+ d_on.error(250, ex)
+ });
+ auto actual = res.get_observer().messages();
+ REQUIRE(required == actual);
+ }
+
+ THEN("there was one subscription and one unsubscription"){
+ auto required = rxu::to_vector({
+ on.subscribe(200, 250)
+ });
+ auto actual = xs.subscriptions();
+ REQUIRE(required == actual);
+ }
+ }
+ }
+}