summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2015-06-09 10:04:04 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2015-06-09 10:04:04 -0700
commitff99d5ef176f38dd389616220415b74d0bf33e4c (patch)
tree1b27718cf3e25c6867b8f8f0abdf65db5c72f3c8 /Rx
parent4af4c2edc3c9e6e8f66bb3e61f4b3653eeae0f07 (diff)
downloadRxCpp-ff99d5ef176f38dd389616220415b74d0bf33e4c.tar.gz
refinements to blocking, reduce and docs
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp83
1 files changed, 53 insertions, 30 deletions
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 5a292f9..b8bb689 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -186,7 +186,7 @@ class blocking_observable
{
template<class Obsvbl, class... ArgN>
static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an)
- -> composite_subscription {
+ -> void {
std::mutex lock;
std::condition_variable wake;
std::exception_ptr error;
@@ -218,9 +218,10 @@ class blocking_observable
dest,
[&](T t){dest.on_next(t);},
[&](std::exception_ptr e){
- dest.on_error(e);
if (do_rethrow) {
error = e;
+ } else {
+ dest.on_error(e);
}
},
[&](){dest.on_completed();}
@@ -255,7 +256,6 @@ class blocking_observable
if (!track->disposed || !track->wakened) abort();
if (error) {std::rethrow_exception(error);}
- return cs;
}
public:
@@ -267,24 +267,48 @@ public:
blocking_observable(observable_type s) : source(std::move(s)) {}
///
- /// subscribe will cause this observable to emit values to the provided subscriber.
+ /// `subscribe` will cause this observable to emit values to the provided subscriber.
+ ///
+ /// \return void
+ ///
+ /// \param an... - the arguments are passed to make_subscriber().
+ ///
/// callers must provide enough arguments to make a subscriber.
/// overrides are supported. thus
- /// subscribe(thesubscriber, composite_subscription())
- /// will take thesubscriber.get_observer() and the provided
+ /// `subscribe(thesubscriber, composite_subscription())`
+ /// will take `thesubscriber.get_observer()` and the provided
/// subscription and subscribe to the new subscriber.
- /// the on_next, on_error, on_completed methods can be supplied instead of an observer
+ /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
/// if a subscription or subscriber is not provided then a new subscription will be created.
///
template<class... ArgN>
auto subscribe(ArgN&&... an) const
- -> composite_subscription {
+ -> void {
return blocking_subscribe(source, false, std::forward<ArgN>(an)...);
}
+ ///
+ /// `subscribe_with_rethrow` will cause this observable to emit values to the provided subscriber.
+ ///
+ /// \note If the source observable calls on_error, the raised exception is rethrown by this method.
+ ///
+ /// \note If the source observable calls on_error, the `on_error` method on the subscriber will not be called.
+ ///
+ /// \return void
+ ///
+ /// \param an... - the arguments are passed to make_subscriber().
+ ///
+ /// callers must provide enough arguments to make a subscriber.
+ /// overrides are supported. thus
+ /// `subscribe(thesubscriber, composite_subscription())`
+ /// will take `thesubscriber.get_observer()` and the provided
+ /// subscription and subscribe to the new subscriber.
+ /// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
+ /// if a subscription or subscriber is not provided then a new subscription will be created.
+ ///
template<class... ArgN>
auto subscribe_with_rethrow(ArgN&&... an) const
- -> composite_subscription {
+ -> void {
return blocking_subscribe(source, true, std::forward<ArgN>(an)...);
}
@@ -305,12 +329,10 @@ public:
*/
T first() {
rxu::maybe<T> result;
- rxu::maybe<std::exception_ptr> error;
composite_subscription cs;
subscribe_with_rethrow(
cs,
- [&](T v){result.reset(v); cs.unsubscribe();},
- [&](std::exception_ptr){});
+ [&](T v){result.reset(v); cs.unsubscribe();});
if (result.empty())
throw rxcpp::empty_error("first() requires a stream with at least one value");
return result.get();
@@ -333,10 +355,8 @@ public:
*/
T last() const {
rxu::maybe<T> result;
- rxu::maybe<std::exception_ptr> error;
subscribe_with_rethrow(
- [&](T v){result.reset(v);},
- [&](std::exception_ptr){});
+ [&](T v){result.reset(v);});
if (result.empty())
throw rxcpp::empty_error("last() requires a stream with at least one value");
return result.get();
@@ -350,15 +370,14 @@ public:
\snippet blocking_observable.cpp blocking count sample
\snippet output.txt blocking count sample
- If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the number of elements:
+ When the source observable calls on_error:
\snippet blocking_observable.cpp blocking count error sample
\snippet output.txt blocking count error sample
*/
int count() const {
- int result;
- on_exception(
- [&](){result = source.count().as_blocking().last(); return true;},
- [&](std::exception_ptr){result = 0;});
+ int result = 0;
+ source.count().as_blocking().subscribe_with_rethrow(
+ [&](int v){result = v;});
return result;
}
@@ -375,7 +394,7 @@ public:
\snippet blocking_observable.cpp blocking sum empty sample
\snippet output.txt blocking sum empty sample
- If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the average value of elements:
+ When the source observable calls on_error:
\snippet blocking_observable.cpp blocking sum error sample
\snippet output.txt blocking sum error sample
*/
@@ -396,7 +415,7 @@ public:
\snippet blocking_observable.cpp blocking average empty sample
\snippet output.txt blocking average empty sample
- If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the average value of elements:
+ When the source observable calls on_error:
\snippet blocking_observable.cpp blocking average error sample
\snippet output.txt blocking average error sample
*/
@@ -2077,7 +2096,7 @@ public:
\snippet math.cpp first sample
\snippet output.txt first sample
- If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers.
+ When the source observable calls on_error:
\snippet math.cpp first empty sample
\snippet output.txt first empty sample
*/
@@ -2092,7 +2111,7 @@ public:
\snippet math.cpp last sample
\snippet output.txt last sample
- If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers.
+ When the source observable calls on_error:
\snippet math.cpp last empty sample
\snippet output.txt last empty sample
*/
@@ -2107,7 +2126,7 @@ public:
\snippet math.cpp count sample
\snippet output.txt count sample
- If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the number of elements:
+ When the source observable calls on_error:
\snippet math.cpp count error sample
\snippet output.txt count error sample
*/
@@ -2127,11 +2146,11 @@ public:
\snippet math.cpp sum sample
\snippet output.txt sum sample
- If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers:
+ When the source observable completes without emitting any items:
\snippet math.cpp sum empty sample
\snippet output.txt sum empty sample
- If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the sum of elements:
+ When the source observable calls on_error:
\snippet math.cpp sum error sample
\snippet output.txt sum error sample
*/
@@ -2151,11 +2170,11 @@ public:
\snippet math.cpp average sample
\snippet output.txt average sample
- If the source observable completes without emitting any items, the resulting observable calls on_error method of its observers:
+ When the source observable completes without emitting any items:
\snippet math.cpp average empty sample
\snippet output.txt average empty sample
- If the source observable calls on_error, the resulting observable behaves like it was on_completed: it emits the average value of elements:
+ When the source observable calls on_error:
\snippet math.cpp average error sample
\snippet output.txt average error sample
*/
@@ -2531,7 +2550,11 @@ auto observable<T, SourceOperator>::last() const
template<class T, class SourceOperator>
auto observable<T, SourceOperator>::first() const
-> observable<T> {
- return this->take(1).last();
+ rxu::maybe<T> seed;
+ return this->take(1).reduce(
+ seed,
+ [](rxu::maybe<T>, T t){return rxu::maybe<T>(std::move(t));},
+ [](rxu::maybe<T> result){return result.empty() ? throw rxcpp::empty_error("first() requires a stream with at least one value") : result.get();});
}
template<class T, class SourceOperator>