diff options
author | Grigoriy Chudnov <g.chudnov@gmail.com> | 2015-12-16 15:14:54 +0300 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2015-12-17 01:28:34 -0800 |
commit | 412b64138e7e62bfe50ed807a0835ed5dd32de98 (patch) | |
tree | f1c2fc8fb5fa87a49d4c5e18ed052e4eb7a484b5 /Rx/v2 | |
parent | bd39892795a4b2d5f02373afd650394dd3280094 (diff) | |
download | RxCpp-412b64138e7e62bfe50ed807a0835ed5dd32de98.tar.gz |
add min and max operators
Diffstat (limited to 'Rx/v2')
-rw-r--r-- | Rx/v2/examples/doxygen/blocking_observable.cpp | 68 | ||||
-rw-r--r-- | Rx/v2/examples/doxygen/math.cpp | 88 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-reduce.hpp | 40 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 92 | ||||
-rw-r--r-- | Rx/v2/test/operators/reduce.cpp | 293 |
5 files changed, 581 insertions, 0 deletions
diff --git a/Rx/v2/examples/doxygen/blocking_observable.cpp b/Rx/v2/examples/doxygen/blocking_observable.cpp index 1e7dd56..eb9633f 100644 --- a/Rx/v2/examples/doxygen/blocking_observable.cpp +++ b/Rx/v2/examples/doxygen/blocking_observable.cpp @@ -159,3 +159,71 @@ SCENARIO("blocking average error sample"){ } printf("//! [blocking average error sample]\n"); } + +SCENARIO("blocking max sample"){ + printf("//! [blocking max sample]\n"); + auto values = rxcpp::observable<>::range(1, 4).as_blocking(); + auto max = values.max(); + printf("max = %d\n", max); + printf("//! [blocking max sample]\n"); +} + +SCENARIO("blocking max empty sample"){ + printf("//! [blocking max empty sample]\n"); + auto values = rxcpp::observable<>::empty<int>().as_blocking(); + try { + auto max = values.max(); + printf("max = %d\n", max); + } catch (const rxcpp::empty_error& ex) { + printf("Exception: %s\n", ex.what()); + } + printf("//! [blocking max empty sample]\n"); +} + +SCENARIO("blocking max error sample"){ + printf("//! [blocking max error sample]\n"); + auto values = rxcpp::observable<>::range(1, 4). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + as_blocking(); + try { + auto max = values.max(); + printf("max = %d\n", max); + } catch (const std::exception& ex) { + printf("Exception: %s\n", ex.what()); + } + printf("//! [blocking max error sample]\n"); +} + +SCENARIO("blocking min sample"){ + printf("//! [blocking min sample]\n"); + auto values = rxcpp::observable<>::range(1, 4).as_blocking(); + auto min = values.min(); + printf("min = %d\n", min); + printf("//! [blocking min sample]\n"); +} + +SCENARIO("blocking min empty sample"){ + printf("//! [blocking min empty sample]\n"); + auto values = rxcpp::observable<>::empty<int>().as_blocking(); + try { + auto min = values.min(); + printf("min = %d\n", min); + } catch (const rxcpp::empty_error& ex) { + printf("Exception: %s\n", ex.what()); + } + printf("//! [blocking min empty sample]\n"); +} + +SCENARIO("blocking min error sample"){ + printf("//! [blocking min error sample]\n"); + auto values = rxcpp::observable<>::range(1, 4). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + as_blocking(); + try { + auto min = values.min(); + printf("min = %d\n", min); + } catch (const std::exception& ex) { + printf("Exception: %s\n", ex.what()); + } + printf("//! [blocking min error sample]\n"); +} diff --git a/Rx/v2/examples/doxygen/math.cpp b/Rx/v2/examples/doxygen/math.cpp index 2338bfa..376cf97 100644 --- a/Rx/v2/examples/doxygen/math.cpp +++ b/Rx/v2/examples/doxygen/math.cpp @@ -170,3 +170,91 @@ SCENARIO("average error sample"){ [](){printf("OnCompleted\n");}); printf("//! [average error sample]\n"); } + +SCENARIO("max sample"){ + printf("//! [max sample]\n"); + auto values = rxcpp::observable<>::range(1, 4).max(); + values. + subscribe( + [](double v){printf("OnNext: %lf\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [max sample]\n"); +} + +SCENARIO("max empty sample"){ + printf("//! [max empty sample]\n"); + auto values = rxcpp::observable<>::empty<int>().max(); + values. + subscribe( + [](double v){printf("OnNext: %lf\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const rxcpp::empty_error& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [max empty sample]\n"); +} + +SCENARIO("max error sample"){ + printf("//! [max error sample]\n"); + auto values = rxcpp::observable<>::range(1, 4). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + max(); + values. + subscribe( + [](double v){printf("OnNext: %lf\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::runtime_error& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [max error sample]\n"); +} + +SCENARIO("min sample"){ + printf("//! [min sample]\n"); + auto values = rxcpp::observable<>::range(1, 4).min(); + values. + subscribe( + [](double v){printf("OnNext: %lf\n", v);}, + [](){printf("OnCompleted\n");}); + printf("//! [min sample]\n"); +} + +SCENARIO("min empty sample"){ + printf("//! [min empty sample]\n"); + auto values = rxcpp::observable<>::empty<int>().min(); + values. + subscribe( + [](double v){printf("OnNext: %lf\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const rxcpp::empty_error& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [min empty sample]\n"); +} + +SCENARIO("min error sample"){ + printf("//! [min error sample]\n"); + auto values = rxcpp::observable<>::range(1, 4). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + min(); + values. + subscribe( + [](double v){printf("OnNext: %lf\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::runtime_error& ex) { + printf("OnError: %s\n", ex.what()); + } + }, + [](){printf("OnCompleted\n");}); + printf("//! [min error sample]\n"); +} diff --git a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp index 588266b..144c225 100644 --- a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp @@ -267,6 +267,46 @@ struct sum { } }; +template<class T> +struct max { + typedef rxu::maybe<T> seed_type; + seed_type seed() { + return seed_type(); + } + seed_type operator()(seed_type a, T v) { + if (a.empty()) + a.reset(v); + else + *a = (v < *a ? *a : v); + return a; + } + T operator()(seed_type a) { + if (a.empty()) + throw rxcpp::empty_error("max() requires a stream with at least one value"); + return *a; + } +}; + +template<class T> +struct min { + typedef rxu::maybe<T> seed_type; + seed_type seed() { + return seed_type(); + } + seed_type operator()(seed_type a, T v) { + if (a.empty()) + a.reset(v); + else + *a = (*a < v ? *a : v); + return a; + } + T operator()(seed_type a) { + if (a.empty()) + throw rxcpp::empty_error("min() requires a stream with at least one value"); + return *a; + } +}; + } template<class Seed, class Accumulator, class ResultSelector> diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 30b1802..2bb5b14 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -422,6 +422,48 @@ public: double average() const { return source.average().as_blocking().last(); } + + /*! Return the max of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. + + \return The max of all items emitted by this blocking_observable. + + \sample + When the source observable emits at least one item: + \snippet blocking_observable.cpp blocking max sample + \snippet output.txt blocking max sample + + When the source observable is empty: + \snippet blocking_observable.cpp blocking max empty sample + \snippet output.txt blocking max empty sample + + When the source observable calls on_error: + \snippet blocking_observable.cpp blocking max error sample + \snippet output.txt blocking max error sample +*/ + T max() const { + return source.max().as_blocking().last(); + } + + /*! Return the min of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. + + \return The min of all items emitted by this blocking_observable. + + \sample + When the source observable emits at least one item: + \snippet blocking_observable.cpp blocking min sample + \snippet output.txt blocking min sample + + When the source observable is empty: + \snippet blocking_observable.cpp blocking min empty sample + \snippet output.txt blocking min empty sample + + When the source observable calls on_error: + \snippet blocking_observable.cpp blocking min error sample + \snippet output.txt blocking min error sample +*/ + T min() const { + return source.min().as_blocking().last(); + } }; template<> @@ -2268,6 +2310,8 @@ public: - rxcpp::observable::count - rxcpp::observable::sum - rxcpp::observable::average + - rxcpp::observable::min + - rxcpp::observable::max \sample Geometric mean of source values: @@ -2406,6 +2450,54 @@ public: return defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::make(source_operator, rxo::detail::average<T>(), rxo::detail::average<T>(), rxo::detail::average<T>().seed()); } + /*! For each item from this observable reduce it by taking the max value of the previous items. + + \return An observable that emits a single item: the max of elements emitted by the source observable. + + \sample + \snippet math.cpp max sample + \snippet output.txt max sample + + When the source observable completes without emitting any items: + \snippet math.cpp max empty sample + \snippet output.txt max empty sample + + When the source observable calls on_error: + \snippet math.cpp max error sample + \snippet output.txt max error sample + */ + auto max() const + /// \cond SHOW_SERVICE_MEMBERS + -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::max, T>, rxu::defer_type<rxo::detail::max, T>, rxu::defer_type<rxo::detail::max, T>>::observable_type + /// \endcond + { + return defer_reduce<rxu::defer_seed_type<rxo::detail::max, T>, rxu::defer_type<rxo::detail::max, T>, rxu::defer_type<rxo::detail::max, T>>::make(source_operator, rxo::detail::max<T>(), rxo::detail::max<T>(), rxo::detail::max<T>().seed()); + } + + /*! For each item from this observable reduce it by taking the min value of the previous items. + + \return An observable that emits a single item: the min of elements emitted by the source observable. + + \sample + \snippet math.cpp min sample + \snippet output.txt min sample + + When the source observable completes without emitting any items: + \snippet math.cpp min empty sample + \snippet output.txt min empty sample + + When the source observable calls on_error: + \snippet math.cpp min error sample + \snippet output.txt min error sample + */ + auto min() const + /// \cond SHOW_SERVICE_MEMBERS + -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::min, T>, rxu::defer_type<rxo::detail::min, T>, rxu::defer_type<rxo::detail::min, T>>::observable_type + /// \endcond + { + return defer_reduce<rxu::defer_seed_type<rxo::detail::min, T>, rxu::defer_type<rxo::detail::min, T>, rxu::defer_type<rxo::detail::min, T>>::make(source_operator, rxo::detail::min<T>(), rxo::detail::min<T>(), rxo::detail::min<T>().seed()); + } + /*! For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned. \tparam Seed the type of the initial value for the accumulator diff --git a/Rx/v2/test/operators/reduce.cpp b/Rx/v2/test/operators/reduce.cpp index e0f8e5e..b91d4b9 100644 --- a/Rx/v2/test/operators/reduce.cpp +++ b/Rx/v2/test/operators/reduce.cpp @@ -99,3 +99,296 @@ SCENARIO("average some data", "[reduce][average][operators]"){ } } } + +SCENARIO("sum some data", "[reduce][sum][operators]"){ + GIVEN("a test hot observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<int> d_on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 3), + on.next(220, 4), + on.next(230, 2), + on.completed(250) + }); + + WHEN("sum is calculated"){ + + auto res = w.start( + [&]() { + return xs.sum(); + } + ); + + THEN("the output contains the sum of source values"){ + auto required = rxu::to_vector({ + d_on.next(250, 9), + d_on.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("max", "[reduce][max][operators]"){ + GIVEN("a test hot observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<int> d_on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 3), + on.next(220, 4), + on.next(230, 2), + on.completed(250) + }); + + WHEN("max is calculated"){ + + auto res = w.start( + [&]() { + return xs.max(); + } + ); + + THEN("the output contains the max of source values"){ + auto required = rxu::to_vector({ + d_on.next(250, 4), + d_on.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("max, empty", "[reduce][max][operators]"){ + GIVEN("a test hot observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<int> d_on; + + std::runtime_error ex("max on_error"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.completed(250) + }); + + WHEN("max is calculated"){ + + auto res = w.start( + [&]() { + return xs.max(); + } + ); + + THEN("the output contains only error message"){ + auto required = rxu::to_vector({ + d_on.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("max, error", "[reduce][max][operators]"){ + GIVEN("a test hot observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<int> d_on; + + std::runtime_error ex("max on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.error(250, ex) + }); + + WHEN("max is calculated"){ + + auto res = w.start( + [&]() { + return xs.max(); + } + ); + + THEN("the output contains only error message"){ + auto required = rxu::to_vector({ + d_on.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("min", "[reduce][min][operators]"){ + GIVEN("a test hot observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<int> d_on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 3), + on.next(220, 4), + on.next(230, 2), + on.completed(250) + }); + + WHEN("min is calculated"){ + + auto res = w.start( + [&]() { + return xs.min(); + } + ); + + THEN("the output contains the min of source values"){ + auto required = rxu::to_vector({ + d_on.next(250, 2), + d_on.completed(250) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("min, empty", "[reduce][min][operators]"){ + GIVEN("a test hot observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<int> d_on; + + std::runtime_error ex("min on_error"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.completed(250) + }); + + WHEN("min is calculated"){ + + auto res = w.start( + [&]() { + return xs.min(); + } + ); + + THEN("the output contains only error message"){ + auto required = rxu::to_vector({ + d_on.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("min, error", "[reduce][min][operators]"){ + GIVEN("a test hot observable of ints"){ + auto sc = rxsc::make_test(); + auto w = sc.create_worker(); + const rxsc::test::messages<int> on; + const rxsc::test::messages<int> d_on; + + std::runtime_error ex("min on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.error(250, ex) + }); + + WHEN("min is calculated"){ + + auto res = w.start( + [&]() { + return xs.min(); + } + ); + + THEN("the output contains only error message"){ + auto required = rxu::to_vector({ + d_on.error(250, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was one subscription and one unsubscription"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} |