summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorValery Kopylov <v-valkop@microsoft.com>2015-06-02 14:30:03 +0300
committerValery Kopylov <v-valkop@microsoft.com>2015-06-09 13:00:53 +0300
commit8f300c39206fc3275778d2cd761c9772bf0741ac (patch)
tree563bdaad69f4918f9ddfbd49f7e7774f80f0b5d1 /Rx
parent1d07307b5a2959eee5352f1795be7eca3b518898 (diff)
downloadRxCpp-8f300c39206fc3275778d2cd761c9772bf0741ac.tar.gz
Call on_error() when reducing an empty observable.
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/examples/doxygen/math.cpp95
-rw-r--r--Rx/v2/examples/doxygen/reduce.cpp20
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-reduce.hpp14
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp20
4 files changed, 113 insertions, 36 deletions
diff --git a/Rx/v2/examples/doxygen/math.cpp b/Rx/v2/examples/doxygen/math.cpp
index dcf3a90..bc782c0 100644
--- a/Rx/v2/examples/doxygen/math.cpp
+++ b/Rx/v2/examples/doxygen/math.cpp
@@ -13,24 +13,21 @@ SCENARIO("first sample"){
printf("//! [first sample]\n");
}
-//SCENARIO("first empty sample"){
-// printf("//! [first empty sample]\n");
-// auto values = rxcpp::observable<>::empty<int>().first();
-// values.
-// subscribe(
-// [](int v){printf("OnNext: %d\n", v);},
-// [](std::exception_ptr ep){
-// try {std::rethrow_exception(ep);}
-// catch (const std::exception& ex) {
-// printf("OnError: %s\n", ex.what());
-// }
-// catch (...) {
-// printf("OnError:\n");
-// }
-// },
-// [](){printf("OnCompleted\n");});
-// printf("//! [first empty sample]\n");
-//}
+SCENARIO("first empty sample"){
+ printf("//! [first empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().first();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [first empty sample]\n");
+}
SCENARIO("last sample"){
printf("//! [last sample]\n");
@@ -42,21 +39,21 @@ SCENARIO("last sample"){
printf("//! [last sample]\n");
}
-//SCENARIO("last empty sample"){
-// printf("//! [last empty sample]\n");
-// auto values = rxcpp::observable<>::empty<int>().last();
-// values.
-// subscribe(
-// [](int v){printf("OnNext: %d\n", v);},
-// [](std::exception_ptr ep){
-// try {std::rethrow_exception(ep);}
-// catch (const std::exception& ex) {
-// printf("OnError: %s\n", ex.what());
-// }
-// },
-// [](){printf("OnCompleted\n");});
-// printf("//! [last empty sample]\n");
-//}
+SCENARIO("last empty sample"){
+ printf("//! [last empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().last();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [last empty sample]\n");
+}
SCENARIO("count sample"){
printf("//! [count sample]\n");
@@ -78,6 +75,22 @@ SCENARIO("sum sample"){
printf("//! [sum sample]\n");
}
+SCENARIO("sum empty sample"){
+ printf("//! [sum empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().sum();
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [sum empty sample]\n");
+}
+
SCENARIO("average sample"){
printf("//! [average sample]\n");
auto values = rxcpp::observable<>::range(1, 4).average();
@@ -87,3 +100,19 @@ SCENARIO("average sample"){
[](){printf("OnCompleted\n");});
printf("//! [average sample]\n");
}
+
+SCENARIO("average empty sample"){
+ printf("//! [average empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().average();
+ values.
+ subscribe(
+ [](double v){printf("OnNext: %lf\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [average empty sample]\n");
+}
diff --git a/Rx/v2/examples/doxygen/reduce.cpp b/Rx/v2/examples/doxygen/reduce.cpp
index 0005bf6..9035e5e 100644
--- a/Rx/v2/examples/doxygen/reduce.cpp
+++ b/Rx/v2/examples/doxygen/reduce.cpp
@@ -22,3 +22,23 @@ SCENARIO("reduce sample"){
[](){printf("OnCompleted\n");});
printf("//! [reduce sample]\n");
}
+
+SCENARIO("reduce empty sample"){
+ printf("//! [reduce empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().
+ reduce(
+ 1,
+ [](int seed, int){return seed;},
+ [](int res){return res;});
+ values.
+ subscribe(
+ [](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::exception& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
+ [](){printf("OnCompleted\n");});
+ printf("//! [reduce empty sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
index 221b163..57c0b2a 100644
--- a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
@@ -88,12 +88,14 @@ struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, SourceOp
, accumulator(std::move(a))
, result_selector(std::move(rs))
, seed(std::move(s))
+ , has_accumulation(false)
{
}
source_type source;
accumulator_type accumulator;
result_selector_type result_selector;
seed_type seed;
+ bool has_accumulation;
private:
reduce_initial_type& operator=(reduce_initial_type o) RXCPP_DELETE;
@@ -134,6 +136,7 @@ struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, SourceOp
[state](T t) {
auto next = state->accumulator(state->current, t);
state->current = next;
+ state->has_accumulation = true;
},
// on_error
[state](std::exception_ptr e) {
@@ -141,9 +144,14 @@ struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, SourceOp
},
// on_completed
[state]() {
- auto result = state->result_selector(state->current);
- state->out.on_next(result);
- state->out.on_completed();
+ if (state->has_accumulation) {
+ auto result = state->result_selector(state->current);
+ state->out.on_next(result);
+ state->out.on_completed();
+ }
+ else {
+ state->out.on_error(std::make_exception_ptr(std::runtime_error("No elements")));
+ }
}
);
}
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 8f8aec7..7d048f6 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -1932,6 +1932,10 @@ public:
Geometric mean of source values:
\snippet reduce.cpp reduce sample
\snippet output.txt reduce sample
+
+ If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers.
+ \snippet reduce.cpp reduce empty sample
+ \snippet output.txt reduce empty sample
*/
template<class Seed, class Accumulator, class ResultSelector>
auto reduce(Seed seed, Accumulator&& a, ResultSelector&& rs) const
@@ -1962,6 +1966,10 @@ public:
\sample
\snippet math.cpp first sample
\snippet output.txt first sample
+
+ If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers.
+ \snippet math.cpp first empty sample
+ \snippet output.txt first empty sample
*/
auto first() const
-> observable<T>;
@@ -1973,6 +1981,10 @@ public:
\sample
\snippet math.cpp last sample
\snippet output.txt last sample
+
+ If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers.
+ \snippet math.cpp last empty sample
+ \snippet output.txt last empty sample
*/
auto last() const
-> observable<T>;
@@ -1995,6 +2007,10 @@ public:
\sample
\snippet math.cpp sum sample
\snippet output.txt sum sample
+
+ If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers.
+ \snippet math.cpp sum empty sample
+ \snippet output.txt sum empty sample
*/
auto sum() const
/// \cond SHOW_SERVICE_MEMBERS
@@ -2011,6 +2027,10 @@ public:
\sample
\snippet math.cpp average sample
\snippet output.txt average sample
+
+ If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers.
+ \snippet math.cpp average empty sample
+ \snippet output.txt average empty sample
*/
auto average() const
/// \cond SHOW_SERVICE_MEMBERS