summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorValery Kopylov <v-valkop@microsoft.com>2015-06-02 18:09:09 +0300
committerValery Kopylov <v-valkop@microsoft.com>2015-06-09 13:00:55 +0300
commit7f9b44a512182cea0883b989ef3980fda48df5e4 (patch)
tree0fe0cecffa2c9ac5351cb393af348b1b1e553c32 /Rx
parenta9a4aea066ed833a569559d3662dca82af4d9a32 (diff)
downloadRxCpp-7f9b44a512182cea0883b989ef3980fda48df5e4.tar.gz
Rethrow source exceptions on blocking_observable aggregate methods
+add docs for them
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/examples/doxygen/blocking_observable.cpp92
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp87
2 files changed, 176 insertions, 3 deletions
diff --git a/Rx/v2/examples/doxygen/blocking_observable.cpp b/Rx/v2/examples/doxygen/blocking_observable.cpp
new file mode 100644
index 0000000..eb3dc30
--- /dev/null
+++ b/Rx/v2/examples/doxygen/blocking_observable.cpp
@@ -0,0 +1,92 @@
+#include "rxcpp/rx.hpp"
+
+#include "rxcpp/rx-test.hpp"
+#include "catch.hpp"
+
+SCENARIO("blocking first sample"){
+ printf("//! [blocking first sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).as_blocking();
+ auto first = values.first();
+ printf("first = %d\n", first);
+ printf("//! [blocking first sample]\n");
+}
+
+SCENARIO("blocking first empty sample"){
+ printf("//! [blocking first empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().as_blocking();
+ try {
+ auto first = values.first();
+ printf("first = %d\n", first);
+ } catch (const std::exception& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
+ printf("//! [blocking first empty sample]\n");
+}
+
+SCENARIO("blocking last sample"){
+ printf("//! [blocking last sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).as_blocking();
+ auto last = values.last();
+ printf("last = %d\n", last);
+ printf("//! [blocking last sample]\n");
+}
+
+SCENARIO("blocking last empty sample"){
+ printf("//! [blocking last empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().as_blocking();
+ try {
+ auto last = values.last();
+ printf("last = %d\n", last);
+ } catch (const std::exception& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
+ printf("//! [blocking last empty sample]\n");
+}
+
+SCENARIO("blocking count sample"){
+ printf("//! [blocking count sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).as_blocking();
+ auto count = values.count();
+ printf("count = %d\n", count);
+ printf("//! [blocking count sample]\n");
+}
+
+SCENARIO("blocking sum sample"){
+ printf("//! [blocking sum sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).as_blocking();
+ auto sum = values.sum();
+ printf("sum = %d\n", sum);
+ printf("//! [blocking sum sample]\n");
+}
+
+SCENARIO("blocking sum empty sample"){
+ printf("//! [blocking sum empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().as_blocking();
+ try {
+ auto sum = values.sum();
+ printf("sum = %d\n", sum);
+ } catch (const std::exception& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
+ printf("//! [blocking sum empty sample]\n");
+}
+
+SCENARIO("blocking average sample"){
+ printf("//! [blocking average sample]\n");
+ auto values = rxcpp::observable<>::range(1, 3).as_blocking();
+ auto average = values.average();
+ printf("average = %d\n", average);
+ printf("//! [blocking average sample]\n");
+}
+
+SCENARIO("blocking average empty sample"){
+ printf("//! [blocking average empty sample]\n");
+ auto values = rxcpp::observable<>::empty<int>().as_blocking();
+ try {
+ auto average = values.average();
+ printf("average = %d\n", average);
+ } catch (const std::exception& ex) {
+ printf("Exception: %s\n", ex.what());
+ }
+ printf("//! [blocking average empty sample]\n");
+}
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index ffe7450..0c87026 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -277,27 +277,108 @@ public:
return blocking_subscribe(source, std::forward<ArgN>(an)...);
}
+ /*! Return the first item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
+
+ \return The first item emitted by this blocking_observable.
+
+ \note If the source observable calls on_error, the raised exception is rethrown by this method.
+
+ \sample
+ When the source observable emits at least one item:
+ \snippet blocking_observable.cpp blocking first sample
+ \snippet output.txt blocking first sample
+
+ When the source observable is empty:
+ \snippet blocking_observable.cpp blocking first empty sample
+ \snippet output.txt blocking first empty sample
+ */
T first() {
return source.first().as_blocking().last();
}
+ /*! Return the last item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
+
+ \return The last item emitted by this blocking_observable.
+
+ \note If the source observable calls on_error, the raised exception is rethrown by this method.
+
+ \sample
+ When the source observable emits at least one item:
+ \snippet blocking_observable.cpp blocking last sample
+ \snippet output.txt blocking last sample
+
+ When the source observable is empty:
+ \snippet blocking_observable.cpp blocking last empty sample
+ \snippet output.txt blocking last empty sample
+ */
T last() const {
rxu::maybe<T> result;
- subscribe([&](T v){result.reset(v);});
- if (result.empty()) throw std::runtime_error("No elements");
+ rxu::maybe<std::exception_ptr> error;
+ subscribe(
+ [&](T v){result.reset(v);},
+ [&](std::exception_ptr ep){error.reset(ep);});
+ if (!error.empty())
+ std::rethrow_exception(error.get());
+ if (result.empty())
+ throw std::runtime_error("No elements");
return result.get();
}
+ /*! Return the total number of items emitted by this blocking_observable.
+
+ \return The total number of items emitted by this blocking_observable.
+
+ \note If the source observable calls on_error, the raised exception is rethrown by this method.
+
+ \sample
+ \snippet blocking_observable.cpp blocking count sample
+ \snippet output.txt blocking count sample
+ */
int count() const {
rxu::maybe<T> result;
+ rxu::maybe<std::exception_ptr> error;
source.count().as_blocking().subscribe(
[&](T v){result.reset(v);},
- [](std::exception_ptr){result.reset(0);});
+ [&](std::exception_ptr ep){error.reset(ep);});
+ if (!error.empty())
+ std::rethrow_exception(error.get());
return result.get();
}
+
+ /*! Return the sum of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
+
+ \return The sum of all items emitted by this blocking_observable.
+
+ \note If the source observable calls on_error, the raised exception is rethrown by this method.
+
+ \sample
+ When the source observable emits at least one item:
+ \snippet blocking_observable.cpp blocking sum sample
+ \snippet output.txt blocking sum sample
+
+ When the source observable is empty:
+ \snippet blocking_observable.cpp blocking sum empty sample
+ \snippet output.txt blocking sum empty sample
+ */
T sum() const {
return source.sum().as_blocking().last();
}
+
+ /*! Return the average value of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
+
+ \return The average value of all items emitted by this blocking_observable.
+
+ \note If the source observable calls on_error, the raised exception is rethrown by this method.
+
+ \sample
+ When the source observable emits at least one item:
+ \snippet blocking_observable.cpp blocking average sample
+ \snippet output.txt blocking average sample
+
+ When the source observable is empty:
+ \snippet blocking_observable.cpp blocking average empty sample
+ \snippet output.txt blocking average empty sample
+ */
double average() const {
return source.average().as_blocking().last();
}