diff options
author | Valery Kopylov <v-valkop@microsoft.com> | 2015-06-09 11:54:39 +0300 |
---|---|---|
committer | Valery Kopylov <v-valkop@microsoft.com> | 2015-06-09 13:00:57 +0300 |
commit | 34359c702a28ca772048e7941429860aea041542 (patch) | |
tree | e58c1084e97ae1ec789212e9a6e68c51d4adb6e9 /Rx | |
parent | 8390b5fb4a0ee0b963beeb7b3e1881a79d8b4bf3 (diff) | |
download | RxCpp-34359c702a28ca772048e7941429860aea041542.tar.gz |
Implement Kirk's proposals from issue #136
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/examples/doxygen/blocking_observable.cpp | 52 | ||||
-rw-r--r-- | Rx/v2/examples/doxygen/math.cpp | 18 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-reduce.hpp | 77 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 17 |
4 files changed, 103 insertions, 61 deletions
diff --git a/Rx/v2/examples/doxygen/blocking_observable.cpp b/Rx/v2/examples/doxygen/blocking_observable.cpp index 2ed5526..ee94013 100644 --- a/Rx/v2/examples/doxygen/blocking_observable.cpp +++ b/Rx/v2/examples/doxygen/blocking_observable.cpp @@ -23,6 +23,20 @@ SCENARIO("blocking first empty sample"){ printf("//! [blocking first empty sample]\n"); } +SCENARIO("blocking first error sample"){ + printf("//! [blocking first error sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + as_blocking(); + try { + auto first = values.first(); + printf("first = %d\n", first); + } catch (const std::exception& ex) { + printf("Exception: %s\n", ex.what()); + } + printf("//! [blocking first error sample]\n"); +} + SCENARIO("blocking last sample"){ printf("//! [blocking last sample]\n"); auto values = rxcpp::observable<>::range(1, 3).as_blocking(); @@ -43,6 +57,20 @@ SCENARIO("blocking last empty sample"){ printf("//! [blocking last empty sample]\n"); } +SCENARIO("blocking last error sample"){ + printf("//! [blocking last error sample]\n"); + auto values = rxcpp::observable<>::range(1, 3). + concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). + as_blocking(); + try { + auto last = values.last(); + printf("last = %d\n", last); + } catch (const std::exception& ex) { + printf("Exception: %s\n", ex.what()); + } + printf("//! [blocking last error sample]\n"); +} + SCENARIO("blocking count sample"){ printf("//! [blocking count sample]\n"); auto values = rxcpp::observable<>::range(1, 3).as_blocking(); @@ -56,8 +84,12 @@ SCENARIO("blocking count error sample"){ auto values = rxcpp::observable<>::range(1, 3). concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). as_blocking(); - auto count = values.count(); - printf("count = %d\n", count); + try { + auto count = values.count(); + printf("count = %d\n", count); + } catch (const std::exception& ex) { + printf("Exception: %s\n", ex.what()); + } printf("//! [blocking count error sample]\n"); } @@ -86,8 +118,12 @@ SCENARIO("blocking sum error sample"){ auto values = rxcpp::observable<>::range(1, 3). concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). as_blocking(); - auto sum = values.sum(); - printf("sum = %d\n", sum); + try { + auto sum = values.sum(); + printf("sum = %d\n", sum); + } catch (const std::exception& ex) { + printf("Exception: %s\n", ex.what()); + } printf("//! [blocking sum error sample]\n"); } @@ -116,7 +152,11 @@ SCENARIO("blocking average error sample"){ auto values = rxcpp::observable<>::range(1, 4). concat(rxcpp::observable<>::error<int>(std::runtime_error("Error from source"))). as_blocking(); - auto average = values.average(); - printf("average = %lf\n", average); + try { + auto average = values.average(); + printf("average = %lf\n", average); + } catch (const std::exception& ex) { + printf("Exception: %s\n", ex.what()); + } printf("//! [blocking average error sample]\n"); } diff --git a/Rx/v2/examples/doxygen/math.cpp b/Rx/v2/examples/doxygen/math.cpp index 3e612c0..2338bfa 100644 --- a/Rx/v2/examples/doxygen/math.cpp +++ b/Rx/v2/examples/doxygen/math.cpp @@ -73,6 +73,12 @@ SCENARIO("count error sample"){ values. subscribe( [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::runtime_error& ex) { + printf("OnError: %s\n", ex.what()); + } + }, [](){printf("OnCompleted\n");}); printf("//! [count error sample]\n"); } @@ -111,6 +117,12 @@ SCENARIO("sum error sample"){ values. subscribe( [](int v){printf("OnNext: %d\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::runtime_error& ex) { + printf("OnError: %s\n", ex.what()); + } + }, [](){printf("OnCompleted\n");}); printf("//! [sum error sample]\n"); } @@ -149,6 +161,12 @@ SCENARIO("average error sample"){ values. subscribe( [](double v){printf("OnNext: %lf\n", v);}, + [](std::exception_ptr ep){ + try {std::rethrow_exception(ep);} + catch (const std::runtime_error& ex) { + printf("OnError: %s\n", ex.what()); + } + }, [](){printf("OnCompleted\n");}); printf("//! [average error sample]\n"); } diff --git a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp index 4a40b63..588266b 100644 --- a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp @@ -12,7 +12,7 @@ namespace rxcpp { class empty_error: public std::runtime_error { public: - empty_error(const std::string& msg): + explicit empty_error(const std::string& msg): std::runtime_error(msg) {} }; @@ -91,19 +91,17 @@ struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, SourceOp ~reduce_initial_type() { } - reduce_initial_type(source_type o, accumulator_type a, result_selector_type rs, seed_type s, bool _abort_on_error) + reduce_initial_type(source_type o, accumulator_type a, result_selector_type rs, seed_type s) : source(std::move(o)) , accumulator(std::move(a)) , result_selector(std::move(rs)) , seed(std::move(s)) - , abort_on_error(_abort_on_error) { } source_type source; accumulator_type accumulator; result_selector_type result_selector; seed_type seed; - bool abort_on_error; private: reduce_initial_type& operator=(reduce_initial_type o) RXCPP_DELETE; @@ -113,8 +111,8 @@ struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, SourceOp ~reduce() { } - reduce(source_type o, accumulator_type a, result_selector_type rs, seed_type s, bool abort_on_error = true) - : initial(std::move(o), std::move(a), std::move(rs), std::move(s), abort_on_error) + reduce(source_type o, accumulator_type a, result_selector_type rs, seed_type s) + : initial(std::move(o), std::move(a), std::move(rs), std::move(s)) { } template<class Subscriber> @@ -138,39 +136,28 @@ struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, SourceOp reduce_state_type& operator=(reduce_state_type o) RXCPP_DELETE; }; auto state = std::make_shared<reduce_state_type>(initial, std::move(o)); - auto on_completed = [state]() { - rxu::maybe<value_type> result; - try { - result.reset(state->result_selector(state->current)); - } catch (const std::exception&) { - state->out.on_error(std::current_exception()); - return; - } - state->out.on_next(result.get()); - state->out.on_completed(); - }; state->source.subscribe( state->out, // on_next [state](T t) { - try { - auto next = state->accumulator(state->current, t); - state->current = next; - } catch (const std::exception&) { - state->out.on_error(std::current_exception()); - } + auto next = state->accumulator(state->current, t); + state->current = next; }, // on_error - [state, on_completed](std::exception_ptr e) { - if (state->abort_on_error) { - state->out.on_error(e); - } - else { - on_completed(); - } + [state](std::exception_ptr e) { + state->out.on_error(e); }, // on_completed - on_completed + [state]() { + auto result = on_exception( + [&](){return state->result_selector(state->current);}, + state->out); + if (result.empty()) { + return; + } + state->out.on_next(result.get()); + state->out.on_completed(); + } ); } private: @@ -256,35 +243,27 @@ struct average { } return avg; } - throw rxcpp::empty_error("No elements"); + throw rxcpp::empty_error("average() requires a stream with at least one value"); } }; template<class T> struct sum { - struct seed_type - { - seed_type() - : value() - , has_accumulation(false) - { - } - T value; - bool has_accumulation; - }; + typedef rxu::maybe<T> seed_type; seed_type seed() { - return seed_type{}; + return seed_type(); } seed_type operator()(seed_type a, T v) { - a.value += v; - a.has_accumulation = true; + if (a.empty()) + a.reset(v); + else + *a = *a + v; return a; } T operator()(seed_type a) { - if (a.has_accumulation) { - return a.value; - } - throw rxcpp::empty_error("No elements"); + if (a.empty()) + throw rxcpp::empty_error("sum() requires a stream with at least one value"); + return *a; } }; diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index a77b083..27a8b36 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -293,7 +293,12 @@ public: \snippet output.txt blocking first empty sample */ T first() { - return source.first().as_blocking().last(); + rxu::maybe<T> result; + composite_subscription cs; + subscribe(cs, [&](T v){result.reset(v); cs.unsubscribe();}); + if (result.empty()) + throw rxcpp::empty_error("first() requires a stream with at least one value"); + return result.get(); } /*! Return the last item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items. @@ -320,7 +325,7 @@ public: if (!error.empty()) std::rethrow_exception(error.get()); if (result.empty()) - throw rxcpp::empty_error("No elements"); + throw rxcpp::empty_error("last() requires a stream with at least one value"); return result.get(); } @@ -2094,7 +2099,7 @@ public: -> typename defer_reduce<int, rxu::count, rxu::defer_type<identity_for, int>>::observable_type /// \endcond { - return defer_reduce<int, rxu::count, rxu::defer_type<identity_for, int>>::make(source_operator, rxu::count(), identity_for<int>(), 0, false); + return defer_reduce<int, rxu::count, rxu::defer_type<identity_for, int>>::make(source_operator, rxu::count(), identity_for<int>(), 0); } /*! For each item from this observable reduce it by adding to the previous items. @@ -2118,7 +2123,7 @@ public: -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>>::observable_type /// \endcond { - return defer_reduce<rxu::defer_seed_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>>::make(source_operator, rxo::detail::sum<T>(), rxo::detail::sum<T>(), rxo::detail::sum<T>().seed(), false); + return defer_reduce<rxu::defer_seed_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>, rxu::defer_type<rxo::detail::sum, T>>::make(source_operator, rxo::detail::sum<T>(), rxo::detail::sum<T>(), rxo::detail::sum<T>().seed()); } /*! For each item from this observable reduce it by adding to the previous values and then dividing by the number of items at the end. @@ -2142,7 +2147,7 @@ public: -> typename defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::observable_type /// \endcond { - return defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::make(source_operator, rxo::detail::average<T>(), rxo::detail::average<T>(), rxo::detail::average<T>().seed(), false); + return defer_reduce<rxu::defer_seed_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>, rxu::defer_type<rxo::detail::average, T>>::make(source_operator, rxo::detail::average<T>(), rxo::detail::average<T>(), rxo::detail::average<T>().seed()); } /*! For each item from this observable use Accumulator to combine items into a value that will be emitted from the new observable that is returned. @@ -2503,7 +2508,7 @@ auto observable<T, SourceOperator>::last() const return this->reduce( seed, [](rxu::maybe<T>, T t){return rxu::maybe<T>(std::move(t));}, - [](rxu::maybe<T> result){return result.empty() ? throw rxcpp::empty_error("No elements") : result.get();}); + [](rxu::maybe<T> result){return result.empty() ? throw rxcpp::empty_error("last() requires a stream with at least one value") : result.get();}); } template<class T, class SourceOperator> |