diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2015-06-09 10:04:04 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2015-06-09 10:04:04 -0700 |
commit | ff99d5ef176f38dd389616220415b74d0bf33e4c (patch) | |
tree | 1b27718cf3e25c6867b8f8f0abdf65db5c72f3c8 /Rx | |
parent | 4af4c2edc3c9e6e8f66bb3e61f4b3653eeae0f07 (diff) | |
download | RxCpp-ff99d5ef176f38dd389616220415b74d0bf33e4c.tar.gz |
refinements to blocking, reduce and docs
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/src/rxcpp/rx-observable.hpp | 83 |
1 files changed, 53 insertions, 30 deletions
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 5a292f9..b8bb689 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -186,7 +186,7 @@ class blocking_observable { template<class Obsvbl, class... ArgN> static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an) - -> composite_subscription { + -> void { std::mutex lock; std::condition_variable wake; std::exception_ptr error; @@ -218,9 +218,10 @@ class blocking_observable dest, [&](T t){dest.on_next(t);}, [&](std::exception_ptr e){ - dest.on_error(e); if (do_rethrow) { error = e; + } else { + dest.on_error(e); } }, [&](){dest.on_completed();} @@ -255,7 +256,6 @@ class blocking_observable if (!track->disposed || !track->wakened) abort(); if (error) {std::rethrow_exception(error);} - return cs; } public: @@ -267,24 +267,48 @@ public: blocking_observable(observable_type s) : source(std::move(s)) {} /// - /// subscribe will cause this observable to emit values to the provided subscriber. + /// `subscribe` will cause this observable to emit values to the provided subscriber. + /// + /// \return void + /// + /// \param an... - the arguments are passed to make_subscriber(). + /// /// callers must provide enough arguments to make a subscriber. /// overrides are supported. thus - /// subscribe(thesubscriber, composite_subscription()) - /// will take thesubscriber.get_observer() and the provided + /// `subscribe(thesubscriber, composite_subscription())` + /// will take `thesubscriber.get_observer()` and the provided /// subscription and subscribe to the new subscriber. - /// the on_next, on_error, on_completed methods can be supplied instead of an observer + /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer /// if a subscription or subscriber is not provided then a new subscription will be created. /// template<class... ArgN> auto subscribe(ArgN&&... an) const - -> composite_subscription { + -> void { return blocking_subscribe(source, false, std::forward<ArgN>(an)...); } + /// + /// `subscribe_with_rethrow` will cause this observable to emit values to the provided subscriber. + /// + /// \note If the source observable calls on_error, the raised exception is rethrown by this method. + /// + /// \note If the source observable calls on_error, the `on_error` method on the subscriber will not be called. + /// + /// \return void + /// + /// \param an... - the arguments are passed to make_subscriber(). + /// + /// callers must provide enough arguments to make a subscriber. + /// overrides are supported. thus + /// `subscribe(thesubscriber, composite_subscription())` + /// will take `thesubscriber.get_observer()` and the provided + /// subscription and subscribe to the new subscriber. + /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer + /// if a subscription or subscriber is not provided then a new subscription will be created. + /// template<class... ArgN> auto subscribe_with_rethrow(ArgN&&... an) const - -> composite_subscription { + -> void { return blocking_subscribe(source, true, std::forward<ArgN>(an)...); } @@ -305,12 +329,10 @@ public: */ T first() { rxu::maybe<T> result; - rxu::maybe<std::exception_ptr> error; composite_subscription cs; subscribe_with_rethrow( cs, - [&](T v){result.reset(v); cs.unsubscribe();}, - [&](std::exception_ptr){}); + [&](T v){result.reset(v); cs.unsubscribe();}); if (result.empty()) throw rxcpp::empty_error("first() requires a stream with at least one value"); return result.get(); @@ -333,10 +355,8 @@ public: */ T last() const { rxu::maybe<T> result; - rxu::maybe<std::exception_ptr> error; subscribe_with_rethrow( - [&](T v){result.reset(v);}, - [&](std::exception_ptr){}); + [&](T v){result.reset(v);}); if (result.empty()) throw rxcpp::empty_error("last() requires a stream with at least one value"); return result.get(); @@ -350,15 +370,14 @@ public: \snippet blocking_observable.cpp blocking count sample \snippet output.txt blocking count sample - If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the number of elements: + When the source observable calls on_error: \snippet blocking_observable.cpp blocking count error sample \snippet output.txt blocking count error sample */ int count() const { - int result; - on_exception( - [&](){result = source.count().as_blocking().last(); return true;}, - [&](std::exception_ptr){result = 0;}); + int result = 0; + source.count().as_blocking().subscribe_with_rethrow( + [&](int v){result = v;}); return result; } @@ -375,7 +394,7 @@ public: \snippet blocking_observable.cpp blocking sum empty sample \snippet output.txt blocking sum empty sample - If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the average value of elements: + When the source observable calls on_error: \snippet blocking_observable.cpp blocking sum error sample \snippet output.txt blocking sum error sample */ @@ -396,7 +415,7 @@ public: \snippet blocking_observable.cpp blocking average empty sample \snippet output.txt blocking average empty sample - If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the average value of elements: + When the source observable calls on_error: \snippet blocking_observable.cpp blocking average error sample \snippet output.txt blocking average error sample */ @@ -2077,7 +2096,7 @@ public: \snippet math.cpp first sample \snippet output.txt first sample - If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers. + When the source observable calls on_error: \snippet math.cpp first empty sample \snippet output.txt first empty sample */ @@ -2092,7 +2111,7 @@ public: \snippet math.cpp last sample \snippet output.txt last sample - If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers. + When the source observable calls on_error: \snippet math.cpp last empty sample \snippet output.txt last empty sample */ @@ -2107,7 +2126,7 @@ public: \snippet math.cpp count sample \snippet output.txt count sample - If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the number of elements: + When the source observable calls on_error: \snippet math.cpp count error sample \snippet output.txt count error sample */ @@ -2127,11 +2146,11 @@ public: \snippet math.cpp sum sample \snippet output.txt sum sample - If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers: + When the source observable completes without emitting any items: \snippet math.cpp sum empty sample \snippet output.txt sum empty sample - If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the sum of elements: + When the source observable calls on_error: \snippet math.cpp sum error sample \snippet output.txt sum error sample */ @@ -2151,11 +2170,11 @@ public: \snippet math.cpp average sample \snippet output.txt average sample - If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers: + When the source observable completes without emitting any items: \snippet math.cpp average empty sample \snippet output.txt average empty sample - If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the average value of elements: + When the source observable calls on_error: \snippet math.cpp average error sample \snippet output.txt average error sample */ @@ -2531,7 +2550,11 @@ auto observable<T, SourceOperator>::last() const template<class T, class SourceOperator> auto observable<T, SourceOperator>::first() const -> observable<T> { - return this->take(1).last(); + rxu::maybe<T> seed; + return this->take(1).reduce( + seed, + [](rxu::maybe<T>, T t){return rxu::maybe<T>(std::move(t));}, + [](rxu::maybe<T> result){return result.empty() ? throw rxcpp::empty_error("first() requires a stream with at least one value") : result.get();}); } template<class T, class SourceOperator> |