summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2015-06-04 14:40:42 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2015-06-04 14:40:42 -0700
commit88d687c580d3b3a9804ba6457349c91b1a80e6ad (patch)
tree1cf1134ac902939953919073eb0de2a1c01ab969 /Rx
parent3ea9d3a58408847655e86aec0548ddbea0e7aeb5 (diff)
downloadRxCpp-88d687c580d3b3a9804ba6457349c91b1a80e6ad.tar.gz
as_blocking().subscribe() now throws when on_error terminates the stream.
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp22
1 files changed, 18 insertions, 4 deletions
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 2cf47dc..8f8aec7 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -189,7 +189,8 @@ class blocking_observable
-> composite_subscription {
std::mutex lock;
std::condition_variable wake;
- composite_subscription cs;
+ std::exception_ptr error;
+
struct tracking
{
~tracking()
@@ -210,8 +211,17 @@ class blocking_observable
};
auto track = std::make_shared<tracking>();
- auto scbr = make_subscriber<T>(std::forward<ArgN>(an)...);
- cs = scbr.get_subscription();
+ auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
+
+ // keep any error to rethrow at the end.
+ auto scbr = make_subscriber<T>(
+ dest,
+ [&](T t){dest.on_next(t);},
+ [&](std::exception_ptr e){dest.on_error(e); error = e;},
+ [&](){dest.on_completed();}
+ );
+
+ auto cs = scbr.get_subscription();
cs.add(
[&, track](){
// OSX geting invalid x86 op if notify_one is after the disposed = true
@@ -220,8 +230,10 @@ class blocking_observable
wake.notify_one();
track->disposed = true;
});
+
std::unique_lock<std::mutex> guard(lock);
source.subscribe(std::move(scbr));
+
wake.wait(guard,
[&, track](){
// this is really not good.
@@ -236,7 +248,9 @@ class blocking_observable
});
track->wakened = true;
if (!track->disposed || !track->wakened) abort();
- return composite_subscription::empty();
+
+ if (error) {std::rethrow_exception(error);}
+ return cs;
}
public: