diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2015-06-04 14:40:42 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2015-06-04 14:40:42 -0700 |
commit | 88d687c580d3b3a9804ba6457349c91b1a80e6ad (patch) | |
tree | 1cf1134ac902939953919073eb0de2a1c01ab969 /Rx | |
parent | 3ea9d3a58408847655e86aec0548ddbea0e7aeb5 (diff) | |
download | RxCpp-88d687c580d3b3a9804ba6457349c91b1a80e6ad.tar.gz |
as_blocking().subscribe() now throws when on_error terminates the stream.
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 2cf47dc..8f8aec7 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -189,7 +189,8 @@ class blocking_observable -> composite_subscription { std::mutex lock; std::condition_variable wake; - composite_subscription cs; + std::exception_ptr error; + struct tracking { ~tracking() @@ -210,8 +211,17 @@ class blocking_observable }; auto track = std::make_shared<tracking>(); - auto scbr = make_subscriber<T>(std::forward<ArgN>(an)...); - cs = scbr.get_subscription(); + auto dest = make_subscriber<T>(std::forward<ArgN>(an)...); + + // keep any error to rethrow at the end. + auto scbr = make_subscriber<T>( + dest, + [&](T t){dest.on_next(t);}, + [&](std::exception_ptr e){dest.on_error(e); error = e;}, + [&](){dest.on_completed();} + ); + + auto cs = scbr.get_subscription(); cs.add( [&, track](){ // OSX geting invalid x86 op if notify_one is after the disposed = true @@ -220,8 +230,10 @@ class blocking_observable wake.notify_one(); track->disposed = true; }); + std::unique_lock<std::mutex> guard(lock); source.subscribe(std::move(scbr)); + wake.wait(guard, [&, track](){ // this is really not good. @@ -236,7 +248,9 @@ class blocking_observable }); track->wakened = true; if (!track->disposed || !track->wakened) abort(); - return composite_subscription::empty(); + + if (error) {std::rethrow_exception(error);} + return cs; } public: |