summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorValery Kopylov <v-valkop@microsoft.com>2015-06-09 11:54:39 +0300
committerValery Kopylov <v-valkop@microsoft.com>2015-06-09 13:00:57 +0300
commit34359c702a28ca772048e7941429860aea041542 (patch)
treee58c1084e97ae1ec789212e9a6e68c51d4adb6e9 /Rx
parent8390b5fb4a0ee0b963beeb7b3e1881a79d8b4bf3 (diff)
downloadRxCpp-34359c702a28ca772048e7941429860aea041542.tar.gz
Implement Kirk's proposals from issue #136
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/examples/doxygen/blocking_observable.cpp52
-rw-r--r--Rx/v2/examples/doxygen/math.cpp18
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-reduce.hpp77
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp17
4 files changed, 103 insertions, 61 deletions
diff --git a/Rx/v2/examples/doxygen/blocking_observable.cpp b/Rx/v2/examples/doxygen/blocking_observable.cpp
index 2ed5526..ee94013 100644
--- a/Rx/v2/examples/doxygen/blocking_observable.cpp
+++ b/Rx/v2/examples/doxygen/blocking_observable.cpp
@@ -23,6 +23,20 @@ SCENARIO("blocking first empty sample"){
printf("//! [blocking first empty sample]\n");
}
+SCENARIO("blocking first error sample"){
+ printf("//! [blocking first error sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ as_blocking();
+ try {
+ auto first = values.first();
+ printf("first = %d\n", first);
+ } catch (const std::exception& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
+ printf("//! [blocking first error sample]\n");
+}
+
SCENARIO("blocking last sample"){
printf("//! [blocking last sample]\n");
auto values = rxcpp::observable<>::range(1, 3).as_blocking();
@@ -43,6 +57,20 @@ SCENARIO("blocking last empty sample"){
printf("//! [blocking last empty sample]\n");
}
+SCENARIO("blocking last error sample"){
+ printf("//! [blocking last error sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).
+ concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
+ as_blocking();
+ try {
+ auto last = values.last();
+ printf("last = %d\n", last);
+ } catch (const std::exception& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
+ printf("//! [blocking last error sample]\n");
+}
+
SCENARIO("blocking count sample"){
printf("//! [blocking count sample]\n");
auto values = rxcpp::observable<>::range(1, 3).as_blocking();
@@ -56,8 +84,12 @@ SCENARIO("blocking count error sample"){
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
as_blocking();
- auto count = values.count();
- printf("count = %d\n", count);
+ try {
+ auto count = values.count();
+ printf("count = %d\n", count);
+ } catch (const std::exception& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
printf("//! [blocking count error sample]\n");
}
@@ -86,8 +118,12 @@ SCENARIO("blocking sum error sample"){
auto values = rxcpp::observable<>::range(1, 3).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
as_blocking();
- auto sum = values.sum();
- printf("sum = %d\n", sum);
+ try {
+ auto sum = values.sum();
+ printf("sum = %d\n", sum);
+ } catch (const std::exception& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
printf("//! [blocking sum error sample]\n");
}
@@ -116,7 +152,11 @@ SCENARIO("blocking average error sample"){
auto values = rxcpp::observable<>::range(1, 4).
concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))).
as_blocking();
- auto average = values.average();
- printf("average = %lf\n", average);
+ try {
+ auto average = values.average();
+ printf("average = %lf\n", average);
+ } catch (const std::exception& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
printf("//! [blocking average error sample]\n");
}
diff --git a/Rx/v2/examples/doxygen/math.cpp b/Rx/v2/examples/doxygen/math.cpp
index 3e612c0..2338bfa 100644
--- a/Rx/v2/examples/doxygen/math.cpp
+++ b/Rx/v2/examples/doxygen/math.cpp
@@ -73,6 +73,12 @@ SCENARIO("count error sample"){
values.
subscribe(
[](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::runtime_error& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
[](){printf("OnCompleted\n");});
printf("//! [count error sample]\n");
}
@@ -111,6 +117,12 @@ SCENARIO("sum error sample"){
values.
subscribe(
[](int v){printf("OnNext: %d\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::runtime_error& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
[](){printf("OnCompleted\n");});
printf("//! [sum error sample]\n");
}
@@ -149,6 +161,12 @@ SCENARIO("average error sample"){
values.
subscribe(
[](double v){printf("OnNext: %lf\n", v);},
+ [](std::exception_ptr ep){
+ try {std::rethrow_exception(ep);}
+ catch (const std::runtime_error& ex) {
+ printf("OnError: %s\n", ex.what());
+ }
+ },
[](){printf("OnCompleted\n");});
printf("//! [average error sample]\n");
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
index 4a40b63..588266b 100644
--- a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
@@ -12,7 +12,7 @@ namespace rxcpp {
class empty_error: public std::runtime_error
{
public:
- empty_error(const std::string& msg):
+ explicit empty_error(const std::string& msg):
std::runtime_error(msg)
{}
};
@@ -91,19 +91,17 @@ struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, SourceOp
~reduce_initial_type()
{
}
- reduce_initial_type(source_type o, accumulator_type a, result_selector_type rs, seed_type s, bool _abort_on_error)
+ reduce_initial_type(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
: source(std::move(o))
, accumulator(std::move(a))
, result_selector(std::move(rs))
, seed(std::move(s))
- , abort_on_error(_abort_on_error)
{
}
source_type source;
accumulator_type accumulator;
result_selector_type result_selector;
seed_type seed;
- bool abort_on_error;
private:
reduce_initial_type& operator=(reduce_initial_type o) RXCPP_DELETE;
@@ -113,8 +111,8 @@ struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, SourceOp
~reduce()
{
}
- reduce(source_type o, accumulator_type a, result_selector_type rs, seed_type s, bool abort_on_error = true)
- : initial(std::move(o), std::move(a), std::move(rs), std::move(s), abort_on_error)
+ reduce(source_type o, accumulator_type a, result_selector_type rs, seed_type s)
+ : initial(std::move(o), std::move(a), std::move(rs), std::move(s))
{
}
template<class Subscriber>
@@ -138,39 +136,28 @@ struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, SourceOp
reduce_state_type& operator=(reduce_state_type o) RXCPP_DELETE;
};
auto state = std::make_shared<reduce_state_type>(initial, std::move(o));
- auto on_completed = [state]() {
- rxu::maybe<value_type> result;
- try {
- result.reset(state->result_selector(state->current));
- } catch (const std::exception&) {
- state->out.on_error(std::current_exception());
- return;
- }
- state->out.on_next(result.get());
- state->out.on_completed();
- };
state->source.subscribe(
state->out,
// on_next
[state](T t) {
- try {
- auto next = state->accumulator(state->current, t);
- state->current = next;
- } catch (const std::exception&) {
- state->out.on_error(std::current_exception());
- }
+ auto next = state->accumulator(state->current, t);
+ state->current = next;
},
// on_error
- [state, on_completed](std::exception_ptr e) {
- if (state->abort_on_error) {
- state->out.on_error(e);
- }
- else {
- on_completed();
- }
+ [state](std::exception_ptr e) {
+ state->out.on_error(e);
},
// on_completed
- on_completed
+ [state]() {
+ auto result = on_exception(
+ [&](){return state->result_selector(state->current);},
+ state->out);
+ if (result.empty()) {
+ return;
+ }
+ state->out.on_next(result.get());
+ state->out.on_completed();
+ }
);
}
private:
@@ -256,35 +243,27 @@ struct average {
}
return avg;
}
- throw rxcpp::empty_error("No elements");
+ throw rxcpp::empty_error("average() requires a stream with at least one value");
}
};
template<class T>
struct sum {
- struct seed_type
- {
- seed_type()
- : value()
- , has_accumulation(false)
- {
- }
- T value;
- bool has_accumulation;
- };
+ typedef rxu::maybe<T> seed_type;
seed_type seed() {
- return seed_type{};
+ return seed_type();
}
seed_type operator()(seed_type a, T v) {
- a.value += v;
- a.has_accumulation = true;
+ if (a.empty())
+ a.reset(v);
+ else
+ *a = *a + v;
return a;
}
T operator()(seed_type a) {
- if (a.has_accumulation) {
- return a.value;
- }
- throw rxcpp::empty_error("No elements");
+ if (a.empty())
+ throw rxcpp::empty_error("sum() requires a stream with at least one value");
+ return *a;
}
};
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index a77b083..27a8b36 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -293,7 +293,12 @@ public:
\snippet output.txt blocking first empty sample
*/
T first() {
- return source.first().as_blocking().last();
+ rxu::maybe<T> result;
+ composite_subscription cs;
+ subscribe(cs, [&](T v){result.reset(v); cs.unsubscribe();});
+ if (result.empty())
+ throw rxcpp::empty_error("first() requires a stream with at least one value");
+ return result.get();
}
/*! Return the last item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
@@ -320,7 +325,7 @@ public:
if (!error.empty())
std::rethrow_exception(error.get());
if (result.empty())
- throw rxcpp::empty_error("No elements");
+ throw rxcpp::empty_error("last() requires a stream with at least one value");
return result.get();
}
@@ -2094,7 +2099,7 @@ public:
-> typename defer_reduce<int, rxu::count, rxu::defer_type<identity_for, int>>::observable_type
/// \endcond
{
- return defer_reduce<int, rxu::count, rxu::defer_type<identity_for, int>>::make(source_operator, rxu::count(), identity_for<int>(), 0, false);
+ return defer_reduce<int, rxu::count, rxu::defer_type<identity_for, int>>::make(source_operator, rxu::count(), identity_for<int>(), 0);
}
/*! For each item from this observable reduce it by adding to the previous items.
@@ -2118,7 +2123,7 @@ public:
-> typename defer_reduce<rxu::defer_seed_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>>::observable_type
/// \endcond
{
- return defer_reduce<rxu::defer_seed_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>>::make(source_operator, rxo::detail::sum<T>(), rxo::detail::sum<T>(), rxo::detail::sum<T>().seed(), false);
+ return defer_reduce<rxu::defer_seed_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>>::make(source_operator, rxo::detail::sum<T>(), rxo::detail::sum<T>(), rxo::detail::sum<T>().seed());
}
/*! For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end.
@@ -2142,7 +2147,7 @@ public:
-> typename defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::observable_type
/// \endcond
{
- return defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::make(source_operator, rxo::detail::average<T>(), rxo::detail::average<T>(), rxo::detail::average<T>().seed(), false);
+ return defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::make(source_operator, rxo::detail::average<T>(), rxo::detail::average<T>(), rxo::detail::average<T>().seed());
}
/*! For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned.
@@ -2503,7 +2508,7 @@ auto observable<T, SourceOperator>::last() const
return this->reduce(
seed,
[](rxu::maybe<T>, T t){return rxu::maybe<T>(std::move(t));},
- [](rxu::maybe<T> result){return result.empty() ? throw rxcpp::empty_error("No elements") : result.get();});
+ [](rxu::maybe<T> result){return result.empty() ? throw rxcpp::empty_error("last() requires a stream with at least one value") : result.get();});
}
template<class T, class SourceOperator>