summaryrefslogtreecommitdiff
path: root/Rx/v2/src
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-all.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-amb.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-any.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-combine_latest.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-concat.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-concat_map.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-debounce.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-delay.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-distinct.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-element_at.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-filter.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-finally.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-flat_map.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-group_by.hpp6
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-map.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-merge.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp12
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-observe_on.hpp9
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp10
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-pairwise.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-reduce.hpp14
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-repeat.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-retry.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-sample_time.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-scan.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-sequence_equal.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-skip.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-skip_last.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-skip_until.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-skip_while.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-switch_if_empty.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-take.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-take_last.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-take_until.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-take_while.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-tap.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-time_interval.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-timeout.hpp4
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-timestamp.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-window.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-window_time.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp6
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp2
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-zip.hpp2
-rw-r--r--Rx/v2/src/rxcpp/rx-composite_exception.hpp4
-rw-r--r--Rx/v2/src/rxcpp/rx-coordination.hpp2
-rw-r--r--Rx/v2/src/rxcpp/rx-includes.hpp16
-rw-r--r--Rx/v2/src/rxcpp/rx-notification.hpp28
-rw-r--r--Rx/v2/src/rxcpp/rx-observable.hpp19
-rw-r--r--Rx/v2/src/rxcpp/rx-observer.hpp40
-rw-r--r--Rx/v2/src/rxcpp/rx-subscriber.hpp10
-rw-r--r--Rx/v2/src/rxcpp/rx-trace.hpp4
-rw-r--r--Rx/v2/src/rxcpp/rx-util.hpp146
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-test.hpp2
-rw-r--r--Rx/v2/src/rxcpp/sources/rx-error.hpp25
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-subject.hpp4
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp10
65 files changed, 314 insertions, 167 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-all.hpp b/Rx/v2/src/rxcpp/operators/rx-all.hpp
index 65bec46..a0f6a3e 100644
--- a/Rx/v2/src/rxcpp/operators/rx-all.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-all.hpp
@@ -83,7 +83,7 @@ struct all
dest.on_completed();
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-amb.hpp b/Rx/v2/src/rxcpp/operators/rx-amb.hpp
index 595dc4d..56bfbe9 100644
--- a/Rx/v2/src/rxcpp/operators/rx-amb.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-amb.hpp
@@ -183,7 +183,7 @@ struct amb
}
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
//on_completed
@@ -196,7 +196,7 @@ struct amb
selectedSource.subscribe(std::move(selectedSinkInner));
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-any.hpp b/Rx/v2/src/rxcpp/operators/rx-any.hpp
index 4488375..19fca6e 100644
--- a/Rx/v2/src/rxcpp/operators/rx-any.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-any.hpp
@@ -90,7 +90,7 @@ struct any
dest.on_completed();
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp b/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp
index a4dea3b..79eb30e 100644
--- a/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-buffer_count.hpp
@@ -96,7 +96,7 @@ struct buffer_count
chunks.pop_front();
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp b/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp
index 7e54eb8..aa94f7b 100644
--- a/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-buffer_time.hpp
@@ -197,7 +197,7 @@ struct buffer_with_time
}
localState->worker.schedule(selectedWork.get());
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
auto localState = state;
auto work = [e, localState](const rxsc::schedulable&){
localState->dest.on_error(e);
diff --git a/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp b/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp
index 346a6ef..6d4b9a4 100644
--- a/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-buffer_time_count.hpp
@@ -179,7 +179,7 @@ struct buffer_with_time_or_count
}
localState->worker.schedule(selectedWork.get());
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
auto localState = state;
auto work = [e, localState](const rxsc::schedulable&){
localState->dest.on_error(e);
diff --git a/Rx/v2/src/rxcpp/operators/rx-combine_latest.hpp b/Rx/v2/src/rxcpp/operators/rx-combine_latest.hpp
index 0f94fde..06ff3d1 100644
--- a/Rx/v2/src/rxcpp/operators/rx-combine_latest.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-combine_latest.hpp
@@ -173,7 +173,7 @@ struct combine_latest : public operator_base<rxu::value_type_t<combine_latest_tr
}
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat.hpp b/Rx/v2/src/rxcpp/operators/rx-concat.hpp
index 57b42fa..457b9cf 100644
--- a/Rx/v2/src/rxcpp/operators/rx-concat.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-concat.hpp
@@ -143,7 +143,7 @@ struct concat
state->out.on_next(ct);
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
//on_completed
@@ -207,7 +207,7 @@ struct concat
}
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
index edcdb26..546c1eb 100644
--- a/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-concat_map.hpp
@@ -183,7 +183,7 @@ struct concat_map
state->out.on_next(std::move(selectedResult));
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
//on_completed
@@ -246,7 +246,7 @@ struct concat_map
}
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-debounce.hpp b/Rx/v2/src/rxcpp/operators/rx-debounce.hpp
index 5f56ff9..6fbedd7 100644
--- a/Rx/v2/src/rxcpp/operators/rx-debounce.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-debounce.hpp
@@ -158,7 +158,7 @@ struct debounce
localState->worker.schedule(selectedWork.get());
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
auto localState = state;
auto work = [e, localState](const rxsc::schedulable&) {
localState->dest.on_error(e);
diff --git a/Rx/v2/src/rxcpp/operators/rx-delay.hpp b/Rx/v2/src/rxcpp/operators/rx-delay.hpp
index 19bbed5..5986f79 100644
--- a/Rx/v2/src/rxcpp/operators/rx-delay.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-delay.hpp
@@ -131,7 +131,7 @@ struct delay
localState->worker.schedule(localState->worker.now() + localState->period, selectedWork.get());
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
auto localState = state;
auto work = [e, localState](const rxsc::schedulable&){
localState->dest.on_error(e);
diff --git a/Rx/v2/src/rxcpp/operators/rx-distinct.hpp b/Rx/v2/src/rxcpp/operators/rx-distinct.hpp
index c3ca086..c90ebdb 100644
--- a/Rx/v2/src/rxcpp/operators/rx-distinct.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-distinct.hpp
@@ -61,7 +61,7 @@ struct distinct
dest.on_next(v);
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp b/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp
index 7d8ef5a..3702185 100644
--- a/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-distinct_until_changed.hpp
@@ -74,7 +74,7 @@ struct distinct_until_changed
dest.on_next(v);
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-element_at.hpp b/Rx/v2/src/rxcpp/operators/rx-element_at.hpp
index 5cbe6dc..1d773ac 100644
--- a/Rx/v2/src/rxcpp/operators/rx-element_at.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-element_at.hpp
@@ -77,12 +77,12 @@ struct element_at {
dest.on_completed();
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
if(current <= this->index) {
- dest.on_error(std::make_exception_ptr(std::range_error("index is out of bounds")));
+ dest.on_error(rxu::make_error_ptr(std::range_error("index is out of bounds")));
}
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-filter.hpp b/Rx/v2/src/rxcpp/operators/rx-filter.hpp
index 86ce649..3a622bb 100644
--- a/Rx/v2/src/rxcpp/operators/rx-filter.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-filter.hpp
@@ -79,7 +79,7 @@ struct filter
dest.on_next(std::forward<Value>(v));
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-finally.hpp b/Rx/v2/src/rxcpp/operators/rx-finally.hpp
index 4e4416c..d6f5487 100644
--- a/Rx/v2/src/rxcpp/operators/rx-finally.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-finally.hpp
@@ -70,7 +70,7 @@ struct finally
void on_next(source_value_type v) const {
dest.on_next(v);
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
index 5b68b37..eb76198 100644
--- a/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-flat_map.hpp
@@ -198,7 +198,7 @@ struct flat_map
state->out.on_next(std::move(selectedResult));
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
//on_completed
@@ -213,7 +213,7 @@ struct flat_map
selectedSource.subscribe(std::move(selectedSinkInner));
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
index 4eadbab..f702fba 100644
--- a/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-group_by.hpp
@@ -195,7 +195,7 @@ struct group_by
auto selectedKey = on_exception(
[&](){
return this->keySelector(v);},
- [this](std::exception_ptr e){on_error(e);});
+ [this](rxu::error_ptr e){on_error(e);});
if (selectedKey.empty()) {
return;
}
@@ -211,13 +211,13 @@ struct group_by
auto selectedMarble = on_exception(
[&](){
return this->marbleSelector(v);},
- [this](std::exception_ptr e){on_error(e);});
+ [this](rxu::error_ptr e){on_error(e);});
if (selectedMarble.empty()) {
return;
}
g->second.on_next(std::move(selectedMarble.get()));
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
for(auto& g : state->groups) {
g.second.on_error(e);
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp b/Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp
index 82fbe95..00b10a6 100644
--- a/Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-ignore_elements.hpp
@@ -56,7 +56,7 @@ struct ignore_elements {
// no-op; ignore element
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-map.hpp b/Rx/v2/src/rxcpp/operators/rx-map.hpp
index a670d88..7570376 100644
--- a/Rx/v2/src/rxcpp/operators/rx-map.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-map.hpp
@@ -77,7 +77,7 @@ struct map
}
dest.on_next(std::move(selected.get()));
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-merge.hpp b/Rx/v2/src/rxcpp/operators/rx-merge.hpp
index ad8f7d0..cc1ea77 100644
--- a/Rx/v2/src/rxcpp/operators/rx-merge.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-merge.hpp
@@ -173,7 +173,7 @@ struct merge
state->out.on_next(std::move(ct));
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
//on_completed
@@ -188,7 +188,7 @@ struct merge
selectedSource.subscribe(std::move(selectedSinkInner));
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp b/Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp
index 43fdee3..51f8867 100644
--- a/Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-merge_delay_error.hpp
@@ -167,10 +167,10 @@ struct merge_delay_error
state->out.on_next(std::move(ct));
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
if(--state->pendingCompletions == 0) {
state->out.on_error(
- std::make_exception_ptr(std::move(state->exception.add(e))));
+ rxu::make_error_ptr(std::move(state->exception.add(e))));
} else {
state->exception.add(e);
}
@@ -180,7 +180,7 @@ struct merge_delay_error
if (--state->pendingCompletions == 0) {
if(!state->exception.empty()) {
state->out.on_error(
- std::make_exception_ptr(std::move(state->exception)));
+ rxu::make_error_ptr(std::move(state->exception)));
} else {
state->out.on_completed();
}
@@ -192,10 +192,10 @@ struct merge_delay_error
selectedSource.subscribe(std::move(selectedSinkInner));
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
if(--state->pendingCompletions == 0) {
state->out.on_error(
- std::make_exception_ptr(std::move(state->exception.add(e))));
+ rxu::make_error_ptr(std::move(state->exception.add(e))));
} else {
state->exception.add(e);
}
@@ -205,7 +205,7 @@ struct merge_delay_error
if (--state->pendingCompletions == 0) {
if(!state->exception.empty()) {
state->out.on_error(
- std::make_exception_ptr(std::move(state->exception)));
+ rxu::make_error_ptr(std::move(state->exception)));
} else {
state->out.on_completed();
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
index 99de4c3..c1d59a9 100644
--- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp
@@ -127,7 +127,7 @@ struct observe_on
auto drain = [keepAlive, this](const rxsc::schedulable& self){
using std::swap;
- try {
+ RXCPP_TRY {
for (;;) {
if (drain_queue.empty() || !destination.is_subscribed()) {
std::unique_lock<std::mutex> guard(lock);
@@ -151,8 +151,9 @@ struct observe_on
self();
if (lifetime.is_subscribed()) break;
}
- } catch(...) {
- destination.on_error(std::current_exception());
+ }
+ RXCPP_CATCH(...) {
+ destination.on_error(rxu::current_exception());
std::unique_lock<std::mutex> guard(lock);
finish(guard, mode::Errored);
}
@@ -188,7 +189,7 @@ struct observe_on
state->fill_queue.push_back(notification_type::on_next(std::move(v)));
state->ensure_processing(guard);
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
std::unique_lock<std::mutex> guard(state->lock);
if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
state->fill_queue.push_back(notification_type::on_error(e));
diff --git a/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp b/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp
index 4ebee38..bc9beba 100644
--- a/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-on_error_resume_next.hpp
@@ -6,9 +6,9 @@
\brief If an error occurs, take the result from the Selector and subscribe to that instead.
- \tparam Selector the actual type of a function of the form `observable<T>(std::exception_ptr)`
+ \tparam Selector the actual type of a function of the form `observable<T>(rxu::error_ptr)`
- \param s the function of the form `observable<T>(std::exception_ptr)`
+ \param s the function of the form `observable<T>(rxu::error_ptr)`
\return Observable that emits the items from the source observable and switches to a new observable on error.
@@ -44,7 +44,7 @@ struct on_error_resume_next
{
typedef rxu::decay_t<T> value_type;
typedef rxu::decay_t<Selector> select_type;
- typedef decltype((*(select_type*)nullptr)(std::exception_ptr())) fallback_type;
+ typedef decltype((*(select_type*)nullptr)(rxu::error_ptr())) fallback_type;
select_type selector;
on_error_resume_next(select_type s)
@@ -58,7 +58,7 @@ struct on_error_resume_next
typedef on_error_resume_next_observer<Subscriber> this_type;
typedef rxu::decay_t<T> value_type;
typedef rxu::decay_t<Selector> select_type;
- typedef decltype((*(select_type*)nullptr)(std::exception_ptr())) fallback_type;
+ typedef decltype((*(select_type*)nullptr)(rxu::error_ptr())) fallback_type;
typedef rxu::decay_t<Subscriber> dest_type;
typedef observer<T, this_type> observer_type;
dest_type dest;
@@ -75,7 +75,7 @@ struct on_error_resume_next
void on_next(value_type v) const {
dest.on_next(std::move(v));
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
auto selected = on_exception(
[&](){
return this->selector(std::move(e));},
diff --git a/Rx/v2/src/rxcpp/operators/rx-pairwise.hpp b/Rx/v2/src/rxcpp/operators/rx-pairwise.hpp
index 584a6bf..411cf27 100644
--- a/Rx/v2/src/rxcpp/operators/rx-pairwise.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-pairwise.hpp
@@ -67,7 +67,7 @@ struct pairwise
dest.on_next(std::make_tuple(remembered.get(), v));
remembered.reset(v);
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
index 7fc9d08..e5dc819 100644
--- a/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-reduce.hpp
@@ -166,7 +166,7 @@ struct reduce : public operator_base<rxu::value_type_t<reduce_traits<T, Observab
state->current = std::move(next);
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
@@ -244,7 +244,7 @@ struct average {
}
return avg;
}
- throw rxcpp::empty_error("average() requires a stream with at least one value");
+ rxu::throw_exception(rxcpp::empty_error("average() requires a stream with at least one value"));
}
};
@@ -264,7 +264,7 @@ struct sum {
}
T operator()(seed_type a) const {
if (a.empty())
- throw rxcpp::empty_error("sum() requires a stream with at least one value");
+ rxu::throw_exception(rxcpp::empty_error("sum() requires a stream with at least one value"));
return *a;
}
};
@@ -283,7 +283,7 @@ struct max {
}
T operator()(seed_type a) {
if (a.empty())
- throw rxcpp::empty_error("max() requires a stream with at least one value");
+ rxu::throw_exception(rxcpp::empty_error("max() requires a stream with at least one value"));
return *a;
}
};
@@ -302,7 +302,7 @@ struct min {
}
T operator()(seed_type a) {
if (a.empty())
- throw rxcpp::empty_error("min() requires a stream with at least one value");
+ rxu::throw_exception(rxcpp::empty_error("min() requires a stream with at least one value"));
return *a;
}
};
@@ -320,7 +320,7 @@ struct first {
}
T operator()(seed_type a) {
if (a.empty()) {
- throw rxcpp::empty_error("first() requires a stream with at least one value");
+ rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
}
return *a;
}
@@ -339,7 +339,7 @@ struct last {
}
T operator()(seed_type a) {
if (a.empty()) {
- throw rxcpp::empty_error("last() requires a stream with at least one value");
+ rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
}
return *a;
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
index 97ba197..3b9ac89 100644
--- a/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-repeat.hpp
@@ -47,7 +47,7 @@ using repeat_invalid_t = typename repeat_invalid<AN...>::type;
namespace repeat {
struct event_handlers {
template <typename State>
- static inline void on_error(State& state, std::exception_ptr& e) {
+ static inline void on_error(State& state, rxu::error_ptr& e) {
state->out.on_error(e);
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp b/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp
index 373d9b3..30a71fe 100644
--- a/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-retry-repeat-common.hpp
@@ -42,7 +42,7 @@ namespace rxcpp {
state->out.on_next(t);
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
EventHandlers::on_error(state, e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-retry.hpp b/Rx/v2/src/rxcpp/operators/rx-retry.hpp
index 3cee7d3..63e5c27 100644
--- a/Rx/v2/src/rxcpp/operators/rx-retry.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-retry.hpp
@@ -43,7 +43,7 @@ using retry_invalid_t = typename retry_invalid<AN...>::type;
namespace retry {
struct event_handlers {
template <typename State>
- static inline void on_error(State& state, std::exception_ptr& e) {
+ static inline void on_error(State& state, rxu::error_ptr& e) {
state->update();
// Use specialized predicate for finite/infinte case
if (state->completed_predicate()) {
diff --git a/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp b/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp
index fe1caa2..f50cbe4 100644
--- a/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-sample_time.hpp
@@ -150,7 +150,7 @@ struct sample_with_time
localState->worker.schedule(selectedWork.get());
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
auto localState = state;
auto work = [e, localState](const rxsc::schedulable&) {
localState->dest.on_error(e);
diff --git a/Rx/v2/src/rxcpp/operators/rx-scan.hpp b/Rx/v2/src/rxcpp/operators/rx-scan.hpp
index b03be30..73bcd87 100644
--- a/Rx/v2/src/rxcpp/operators/rx-scan.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-scan.hpp
@@ -90,7 +90,7 @@ struct scan : public operator_base<rxu::decay_t<Seed>>
state->out.on_next(state->result);
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-sequence_equal.hpp b/Rx/v2/src/rxcpp/operators/rx-sequence_equal.hpp
index 06253b4..3350e44 100644
--- a/Rx/v2/src/rxcpp/operators/rx-sequence_equal.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-sequence_equal.hpp
@@ -161,7 +161,7 @@ struct sequence_equal : public operator_base<bool>
check_equal();
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
@@ -189,7 +189,7 @@ struct sequence_equal : public operator_base<bool>
check_equal();
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-skip.hpp b/Rx/v2/src/rxcpp/operators/rx-skip.hpp
index c7f6c11..b77c4da 100644
--- a/Rx/v2/src/rxcpp/operators/rx-skip.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-skip.hpp
@@ -109,7 +109,7 @@ struct skip : public operator_base<T>
}
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->mode_value = mode::errored;
state->out.on_error(e);
},
diff --git a/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp b/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp
index 6641bd8..9594715 100644
--- a/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-skip_last.hpp
@@ -103,7 +103,7 @@ struct skip_last : public operator_base<T>
}
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp b/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp
index 02a2424..4df6671 100644
--- a/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-skip_until.hpp
@@ -139,7 +139,7 @@ struct skip_until : public operator_base<T>
state->trigger_lifetime.unsubscribe();
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
if (state->mode_value != mode::skipping) {
return;
}
@@ -174,7 +174,7 @@ struct skip_until : public operator_base<T>
state->out.on_next(t);
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
if (state->mode_value > mode::triggered) {
return;
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-skip_while.hpp b/Rx/v2/src/rxcpp/operators/rx-skip_while.hpp
index fdd06b6..643d867 100644
--- a/Rx/v2/src/rxcpp/operators/rx-skip_while.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-skip_while.hpp
@@ -75,7 +75,7 @@ struct skip_while
dest.on_next(v);
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-switch_if_empty.hpp b/Rx/v2/src/rxcpp/operators/rx-switch_if_empty.hpp
index b9bab52..8d3e57e 100644
--- a/Rx/v2/src/rxcpp/operators/rx-switch_if_empty.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-switch_if_empty.hpp
@@ -76,7 +76,7 @@ struct switch_if_empty
is_empty = false;
dest.on_next(std::move(v));
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(std::move(e));
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp b/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp
index b18963a..4572121 100644
--- a/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-switch_on_next.hpp
@@ -155,7 +155,7 @@ struct switch_on_next
state->out.on_next(std::move(ct));
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
//on_completed
@@ -171,7 +171,7 @@ struct switch_on_next
selectedSource.subscribe(std::move(selectedSinkInner));
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-take.hpp b/Rx/v2/src/rxcpp/operators/rx-take.hpp
index 054c136..1e4da4d 100644
--- a/Rx/v2/src/rxcpp/operators/rx-take.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-take.hpp
@@ -112,7 +112,7 @@ struct take : public operator_base<T>
}
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->mode_value = mode::errored;
state->out.on_error(e);
},
diff --git a/Rx/v2/src/rxcpp/operators/rx-take_last.hpp b/Rx/v2/src/rxcpp/operators/rx-take_last.hpp
index d59b355..12e28b6 100644
--- a/Rx/v2/src/rxcpp/operators/rx-take_last.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-take_last.hpp
@@ -100,7 +100,7 @@ struct take_last : public operator_base<T>
}
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp
index 29aaf88..3fd9b71 100644
--- a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp
@@ -147,7 +147,7 @@ struct take_until : public operator_base<T>
state->out.on_completed();
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
if (state->mode_value != mode::taking) {return;}
state->mode_value = mode::errored;
state->out.on_error(e);
@@ -179,7 +179,7 @@ struct take_until : public operator_base<T>
}
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
if (state->mode_value > mode::clear) {return;}
state->mode_value = mode::errored;
state->out.on_error(e);
diff --git a/Rx/v2/src/rxcpp/operators/rx-take_while.hpp b/Rx/v2/src/rxcpp/operators/rx-take_while.hpp
index de1d57d..85630e7 100644
--- a/Rx/v2/src/rxcpp/operators/rx-take_while.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-take_while.hpp
@@ -73,7 +73,7 @@ struct take_while
dest.on_completed();
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-tap.hpp b/Rx/v2/src/rxcpp/operators/rx-tap.hpp
index 51bfb2f..550163a 100644
--- a/Rx/v2/src/rxcpp/operators/rx-tap.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-tap.hpp
@@ -92,7 +92,7 @@ struct tap
out.on_next(v);
dest.on_next(v);
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
out.on_error(e);
dest.on_error(e);
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-time_interval.hpp b/Rx/v2/src/rxcpp/operators/rx-time_interval.hpp
index 181c066..1a4c9a5 100644
--- a/Rx/v2/src/rxcpp/operators/rx-time_interval.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-time_interval.hpp
@@ -84,7 +84,7 @@ struct time_interval
dest.on_next(now - last);
last = now;
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-timeout.hpp b/Rx/v2/src/rxcpp/operators/rx-timeout.hpp
index 841df12..d100fa4 100644
--- a/Rx/v2/src/rxcpp/operators/rx-timeout.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-timeout.hpp
@@ -147,7 +147,7 @@ struct timeout
if(id != state->index)
return;
- state->dest.on_error(std::make_exception_ptr(rxcpp::timeout_error("timeout has occurred")));
+ state->dest.on_error(rxu::make_error_ptr(rxcpp::timeout_error("timeout has occurred")));
};
auto selectedProduce = on_exception(
@@ -178,7 +178,7 @@ struct timeout
localState->worker.schedule(selectedWork.get());
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
auto localState = state;
auto work = [e, localState](const rxsc::schedulable&) {
localState->dest.on_error(e);
diff --git a/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp b/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp
index d1a11bf..923cf5d 100644
--- a/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-timestamp.hpp
@@ -78,7 +78,7 @@ struct timestamp
void on_next(source_value_type v) const {
dest.on_next(std::make_pair(v, coord.now()));
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
dest.on_error(e);
}
void on_completed() const {
diff --git a/Rx/v2/src/rxcpp/operators/rx-window.hpp b/Rx/v2/src/rxcpp/operators/rx-window.hpp
index e5d2c6f..e033a84 100644
--- a/Rx/v2/src/rxcpp/operators/rx-window.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-window.hpp
@@ -103,7 +103,7 @@ struct window
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
for (auto s : subj) {
s.get_subscriber().on_error(e);
}
diff --git a/Rx/v2/src/rxcpp/operators/rx-window_time.hpp b/Rx/v2/src/rxcpp/operators/rx-window_time.hpp
index 17d4dd2..57f7572 100644
--- a/Rx/v2/src/rxcpp/operators/rx-window_time.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-window_time.hpp
@@ -195,7 +195,7 @@ struct window_with_time
localState->worker.schedule(selectedWork.get());
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
auto localState = state;
auto work = [e, localState](const rxsc::schedulable&){
for (auto s : localState->subj) {
diff --git a/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp b/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp
index 39c64ad..9375737 100644
--- a/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-window_time_count.hpp
@@ -183,7 +183,7 @@ struct window_with_time_or_count
localState->worker.schedule(selectedWork.get());
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
auto localState = state;
auto work = [e, localState](const rxsc::schedulable&){
localState->subj.get_subscriber().on_error(e);
diff --git a/Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp b/Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp
index c17d4bc..b9f119a 100644
--- a/Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-window_toggle.hpp
@@ -176,7 +176,7 @@ struct window_toggle
innercs.unsubscribe();
},
// on_error
- [localState](std::exception_ptr e) {
+ [localState](rxu::error_ptr e) {
localState->dest.on_error(e);
},
// on_completed
@@ -186,7 +186,7 @@ struct window_toggle
source.subscribe(std::move(selectedSink));
},
// on_error
- [localState](std::exception_ptr e) {
+ [localState](rxu::error_ptr e) {
localState->dest.on_error(e);
},
// on_completed
@@ -218,7 +218,7 @@ struct window_toggle
localState->worker.schedule(selectedWork.get());
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
auto localState = state;
auto work = [e, localState](const rxsc::schedulable&){
for (auto s : localState->subj) {
diff --git a/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp b/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp
index febba99..616e5d8 100644
--- a/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-with_latest_from.hpp
@@ -173,7 +173,7 @@ struct with_latest_from : public operator_base<rxu::value_type_t<with_latest_fro
}
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/operators/rx-zip.hpp b/Rx/v2/src/rxcpp/operators/rx-zip.hpp
index 90efe07..b8169fd 100644
--- a/Rx/v2/src/rxcpp/operators/rx-zip.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-zip.hpp
@@ -203,7 +203,7 @@ struct zip : public operator_base<rxu::value_type_t<zip_traits<Coordination, Sel
}
},
// on_error
- [state](std::exception_ptr e) {
+ [state](rxu::error_ptr e) {
state->out.on_error(e);
},
// on_completed
diff --git a/Rx/v2/src/rxcpp/rx-composite_exception.hpp b/Rx/v2/src/rxcpp/rx-composite_exception.hpp
index 333f291..cddd03e 100644
--- a/Rx/v2/src/rxcpp/rx-composite_exception.hpp
+++ b/Rx/v2/src/rxcpp/rx-composite_exception.hpp
@@ -11,7 +11,7 @@ namespace rxcpp {
struct composite_exception : std::exception {
- typedef std::vector<std::exception_ptr> exception_values;
+ typedef std::vector<rxu::error_ptr> exception_values;
virtual const char *what() const RXCPP_NOEXCEPT override {
return "rxcpp composite exception";
@@ -21,7 +21,7 @@ struct composite_exception : std::exception {
return exceptions.empty();
}
- virtual composite_exception add(std::exception_ptr exception_ptr) {
+ virtual composite_exception add(rxu::error_ptr exception_ptr) {
exceptions.push_back(exception_ptr);
return *this;
}
diff --git a/Rx/v2/src/rxcpp/rx-coordination.hpp b/Rx/v2/src/rxcpp/rx-coordination.hpp
index d2769e9..34e1223 100644
--- a/Rx/v2/src/rxcpp/rx-coordination.hpp
+++ b/Rx/v2/src/rxcpp/rx-coordination.hpp
@@ -227,7 +227,7 @@ class serialize_one_worker : public coordination_base
std::unique_lock<std::mutex> guard(*lock);
dest.on_next(v);
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
std::unique_lock<std::mutex> guard(*lock);
dest.on_error(e);
}
diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp
index 5104565..1f5c5e4 100644
--- a/Rx/v2/src/rxcpp/rx-includes.hpp
+++ b/Rx/v2/src/rxcpp/rx-includes.hpp
@@ -23,6 +23,10 @@
#define RXCPP_USE_RTTI 1
#endif
+#if _HAS_EXCEPTIONS
+#define RXCPP_USE_EXCEPTIONS 1
+#endif
+
#elif defined(__clang__)
#if __has_feature(cxx_rvalue_references)
@@ -34,6 +38,9 @@
#if __has_feature(cxx_variadic_templates)
#define RXCPP_USE_VARIADIC_TEMPLATES 1
#endif
+#if __has_feature(cxx_exceptions)
+#define RXCPP_USE_EXCEPTIONS 1
+#endif
#elif defined(__GNUG__)
@@ -53,6 +60,10 @@
#define RXCPP_USE_RTTI 1
#endif
+#if defined(__EXCEPTIONS)
+#define RXCPP_USE_EXCEPTIONS 1
+#endif
+
#endif
//
@@ -95,6 +106,11 @@
#define RXCPP_USE_RTTI RXCPP_FORCE_USE_RTTI
#endif
+#if defined(RXCPP_FORCE_USE_EXCEPTIONS)
+#undef RXCPP_USE_EXCEPTIONS
+#define RXCPP_USE_EXCEPTIONS RXCPP_FORCE_USE_EXCEPTIONS
+#endif
+
#if defined(RXCPP_FORCE_USE_WINRT)
#undef RXCPP_USE_WINRT
#define RXCPP_USE_WINRT RXCPP_FORCE_USE_WINRT
diff --git a/Rx/v2/src/rxcpp/rx-notification.hpp b/Rx/v2/src/rxcpp/rx-notification.hpp
index d624d34..20e0c69 100644
--- a/Rx/v2/src/rxcpp/rx-notification.hpp
+++ b/Rx/v2/src/rxcpp/rx-notification.hpp
@@ -106,7 +106,7 @@ auto equals(const T& lhs, const T& rhs, int)
template<class T>
bool equals(const T&, const T&, ...) {
- throw std::runtime_error("value does not support equality tests");
+ rxu::throw_exception(std::runtime_error("value does not support equality tests"));
return false;
}
@@ -146,26 +146,20 @@ private:
};
struct on_error_notification : public base {
- on_error_notification(std::exception_ptr ep) : ep(ep) {
+ on_error_notification(rxu::error_ptr ep) : ep(ep) {
}
on_error_notification(const on_error_notification& o) : ep(o.ep) {}
on_error_notification(const on_error_notification&& o) : ep(std::move(o.ep)) {}
on_error_notification& operator=(on_error_notification o) { ep = std::move(o.ep); return *this; }
virtual void out(std::ostream& os) const {
os << "on_error(";
- try {
- std::rethrow_exception(ep);
- } catch (const std::exception& e) {
- os << e.what();
- } catch (...) {
- os << "<not derived from std::exception>";
- }
+ os << rxu::what(ep);
os << ")";
}
virtual bool equals(const typename base::type& other) const {
bool result = false;
// not trying to compare exceptions
- other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](std::exception_ptr){
+ other->accept(make_subscriber<T>(make_observer_dynamic<T>([](T){}, [&result](rxu::error_ptr){
result = true;
})));
return result;
@@ -173,7 +167,7 @@ private:
virtual void accept(const typename base::observer_type& o) const {
o.on_error(ep);
}
- const std::exception_ptr ep;
+ const rxu::error_ptr ep;
};
struct on_completed_notification : public base {
@@ -199,20 +193,14 @@ private:
template<typename Exception>
static
type make_on_error(exception_tag&&, Exception&& e) {
- std::exception_ptr ep;
- try {
- throw std::forward<Exception>(e);
- }
- catch (...) {
- ep = std::current_exception();
- }
+ rxu::error_ptr ep = rxu::make_error_ptr(std::forward<Exception>(e));
return std::make_shared<on_error_notification>(ep);
}
struct exception_ptr_tag {};
static
- type make_on_error(exception_ptr_tag&&, std::exception_ptr ep) {
+ type make_on_error(exception_ptr_tag&&, rxu::error_ptr ep) {
return std::make_shared<on_error_notification>(ep);
}
@@ -229,7 +217,7 @@ public:
template<typename Exception>
static type on_error(Exception&& e) {
return make_on_error(typename std::conditional<
- std::is_same<rxu::decay_t<Exception>, std::exception_ptr>::value,
+ std::is_same<rxu::decay_t<Exception>, rxu::error_ptr>::value,
exception_ptr_tag, exception_tag>::type(),
std::forward<Exception>(e));
}
diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp
index 496db0b..3bbb448 100644
--- a/Rx/v2/src/rxcpp/rx-observable.hpp
+++ b/Rx/v2/src/rxcpp/rx-observable.hpp
@@ -173,7 +173,7 @@ class blocking_observable
-> void {
std::mutex lock;
std::condition_variable wake;
- std::exception_ptr error;
+ rxu::error_ptr error;
struct tracking
{
@@ -201,7 +201,7 @@ class blocking_observable
auto scbr = make_subscriber<T>(
dest,
[&](T t){dest.on_next(t);},
- [&](std::exception_ptr e){
+ [&](rxu::error_ptr e){
if (do_rethrow) {
error = e;
} else {
@@ -239,7 +239,7 @@ class blocking_observable
track->wakened = true;
if (!track->disposed || !track->wakened) std::terminate();
- if (error) {std::rethrow_exception(error);}
+ if (error) {rxu::rethrow_exception(error);}
}
public:
@@ -319,7 +319,7 @@ public:
cs,
[&](T v){result.reset(v); cs.unsubscribe();});
if (result.empty())
- throw rxcpp::empty_error("first() requires a stream with at least one value");
+ rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
return result.get();
static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
}
@@ -345,7 +345,7 @@ public:
subscribe_with_rethrow(
[&](T v){result.reset(v);});
if (result.empty())
- throw rxcpp::empty_error("last() requires a stream with at least one value");
+ rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
return result.get();
static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
}
@@ -462,14 +462,13 @@ struct safe_subscriber
safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
void subscribe() {
- try {
+ RXCPP_TRY {
so->on_subscribe(*o);
- }
- catch(...) {
+ } RXCPP_CATCH(...) {
if (!o->is_subscribed()) {
- throw;
+ rxu::rethrow_current_exception();
}
- o->on_error(std::current_exception());
+ o->on_error(rxu::make_error_ptr(rxu::current_exception()));
o->unsubscribe();
}
}
diff --git a/Rx/v2/src/rxcpp/rx-observer.hpp b/Rx/v2/src/rxcpp/rx-observer.hpp
index 62aed3c..92321f1 100644
--- a/Rx/v2/src/rxcpp/rx-observer.hpp
+++ b/Rx/v2/src/rxcpp/rx-observer.hpp
@@ -25,14 +25,14 @@ struct OnNextEmpty
};
struct OnErrorEmpty
{
- void operator()(std::exception_ptr) const {
+ void operator()(rxu::error_ptr) const {
// error implicitly ignored, abort
std::terminate();
}
};
struct OnErrorIgnore
{
- void operator()(std::exception_ptr) const {
+ void operator()(rxu::error_ptr) const {
}
};
struct OnCompletedEmpty
@@ -76,7 +76,7 @@ struct OnErrorForward
OnErrorForward() : onerror() {}
explicit OnErrorForward(onerror_t oe) : onerror(std::move(oe)) {}
onerror_t onerror;
- void operator()(state_t& s, std::exception_ptr ep) const {
+ void operator()(state_t& s, rxu::error_ptr ep) const {
onerror(s, ep);
}
};
@@ -85,7 +85,7 @@ struct OnErrorForward<State, void>
{
using state_t = rxu::decay_t<State>;
OnErrorForward() {}
- void operator()(state_t& s, std::exception_ptr ep) const {
+ void operator()(state_t& s, rxu::error_ptr ep) const {
s.on_error(ep);
}
};
@@ -129,7 +129,7 @@ struct is_on_error
{
struct not_void {};
template<class CF>
- static auto check(int) -> decltype((*(CF*)nullptr)(*(std::exception_ptr*)nullptr));
+ static auto check(int) -> decltype((*(CF*)nullptr)(*(rxu::error_ptr*)nullptr));
template<class CF>
static not_void check(...);
@@ -141,7 +141,7 @@ struct is_on_error_for
{
struct not_void {};
template<class CF>
- static auto check(int) -> decltype((*(CF*)nullptr)(*(State*)nullptr, *(std::exception_ptr*)nullptr));
+ static auto check(int) -> decltype((*(CF*)nullptr)(*(State*)nullptr, *(rxu::error_ptr*)nullptr));
template<class CF>
static not_void check(...);
@@ -169,7 +169,7 @@ struct is_on_completed
\tparam T - the type of value in the stream
\tparam State - the type of the stored state
\tparam OnNext - the type of a function that matches `void(State&, T)`. Called 0 or more times. If `void` State::on_next will be called.
- \tparam OnError - the type of a function that matches `void(State&, std::exception_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_error will be called.
+ \tparam OnError - the type of a function that matches `void(State&, rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_error will be called.
\tparam OnCompleted - the type of a function that matches `void(State&)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_completed will be called.
\ingroup group-core
@@ -244,7 +244,7 @@ public:
void on_next(T&& t) const {
onnext(state, std::move(t));
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
onerror(state, e);
}
void on_completed() const {
@@ -260,7 +260,7 @@ public:
\tparam T - the type of value in the stream
\tparam OnNext - the type of a function that matches `void(T)`. Called 0 or more times. If `void` OnNextEmpty<T> is used.
- \tparam OnError - the type of a function that matches `void(std::exception_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` OnErrorEmpty is used.
+ \tparam OnError - the type of a function that matches `void(rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` OnErrorEmpty is used.
\tparam OnCompleted - the type of a function that matches `void()`. Called 0 or 1 times, no further calls will be made. If `void` OnCompletedEmpty is used.
\ingroup group-core
@@ -291,7 +291,7 @@ private:
public:
static_assert(detail::is_on_next_of<T, on_next_t>::value, "Function supplied for on_next must be a function with the signature void(T);");
- static_assert(detail::is_on_error<on_error_t>::value, "Function supplied for on_error must be a function with the signature void(std::exception_ptr);");
+ static_assert(detail::is_on_error<on_error_t>::value, "Function supplied for on_error must be a function with the signature void(rxu::error_ptr);");
static_assert(detail::is_on_completed<on_completed_t>::value, "Function supplied for on_completed must be a function with the signature void();");
observer()
@@ -332,7 +332,7 @@ public:
void on_next(T&& t) const {
onnext(std::move(t));
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
onerror(e);
}
void on_completed() const {
@@ -352,7 +352,7 @@ struct virtual_observer : public std::enable_shared_from_this<virtual_observer<T
virtual ~virtual_observer() {}
virtual void on_next(T&) const {};
virtual void on_next(T&&) const {};
- virtual void on_error(std::exception_ptr) const {};
+ virtual void on_error(rxu::error_ptr) const {};
virtual void on_completed() const {};
};
@@ -371,7 +371,7 @@ struct specific_observer : public virtual_observer<T>
virtual void on_next(T&& t) const {
destination.on_next(std::move(t));
}
- virtual void on_error(std::exception_ptr e) const {
+ virtual void on_error(rxu::error_ptr e) const {
destination.on_error(e);
}
virtual void on_completed() const {
@@ -439,7 +439,7 @@ public:
destination->on_next(std::forward<V>(v));
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
if (destination) {
destination->on_error(e);
}
@@ -640,10 +640,10 @@ template<class F, class OnError>
auto on_exception(const F& f, const OnError& c)
-> typename std::enable_if<detail::is_on_error<OnError>::value, typename detail::maybe_from_result<F>::type>::type {
typename detail::maybe_from_result<F>::type r;
- try {
+ RXCPP_TRY {
r.reset(f());
- } catch (...) {
- c(std::current_exception());
+ } RXCPP_CATCH(...) {
+ c(rxu::current_exception());
}
return r;
}
@@ -652,10 +652,10 @@ template<class F, class Subscriber>
auto on_exception(const F& f, const Subscriber& s)
-> typename std::enable_if<is_subscriber<Subscriber>::value, typename detail::maybe_from_result<F>::type>::type {
typename detail::maybe_from_result<F>::type r;
- try {
+ RXCPP_TRY {
r.reset(f());
- } catch (...) {
- s.on_error(std::current_exception());
+ } RXCPP_CATCH(...) {
+ s.on_error(rxu::current_exception());
}
return r;
}
diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp
index abdb7b6..3d6c515 100644
--- a/Rx/v2/src/rxcpp/rx-subscriber.hpp
+++ b/Rx/v2/src/rxcpp/rx-subscriber.hpp
@@ -50,11 +50,11 @@ class subscriber : public subscriber_base<T>
template<class U>
void operator()(U u) {
trace_activity().on_next_enter(*that, u);
- try {
+ RXCPP_TRY {
that->destination.on_next(std::move(u));
do_unsubscribe = false;
- } catch(...) {
- auto ex = std::current_exception();
+ } RXCPP_CATCH(...) {
+ auto ex = rxu::current_exception();
trace_activity().on_error_enter(*that, ex);
that->destination.on_error(std::move(ex));
trace_activity().on_error_return(*that);
@@ -75,7 +75,7 @@ class subscriber : public subscriber_base<T>
: that(that)
{
}
- inline void operator()(std::exception_ptr ex) {
+ inline void operator()(rxu::error_ptr ex) {
trace_activity().on_error_enter(*that, ex);
that->destination.on_error(std::move(ex));
}
@@ -180,7 +180,7 @@ public:
nextdetacher protect(this);
protect(std::forward<V>(v));
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
if (!is_subscribed()) {
return;
}
diff --git a/Rx/v2/src/rxcpp/rx-trace.hpp b/Rx/v2/src/rxcpp/rx-trace.hpp
index a148953..bf0abaf 100644
--- a/Rx/v2/src/rxcpp/rx-trace.hpp
+++ b/Rx/v2/src/rxcpp/rx-trace.hpp
@@ -93,8 +93,8 @@ struct trace_noop
template<class Subscriber>
inline void on_next_return(const Subscriber&) {}
- template<class Subscriber>
- inline void on_error_enter(const Subscriber&, const std::exception_ptr&) {}
+ template<class Subscriber, class ErrorPtr>
+ inline void on_error_enter(const Subscriber&, const ErrorPtr&) {}
template<class Subscriber>
inline void on_error_return(const Subscriber&) {}
diff --git a/Rx/v2/src/rxcpp/rx-util.hpp b/Rx/v2/src/rxcpp/rx-util.hpp
index 97ff73f..cd5f39b 100644
--- a/Rx/v2/src/rxcpp/rx-util.hpp
+++ b/Rx/v2/src/rxcpp/rx-util.hpp
@@ -28,6 +28,18 @@
#define RXCPP_MAKE_IDENTIFIER(Prefix) RXCPP_CONCAT_EVALUATE(Prefix, __LINE__)
+// Provide replacements for try/catch keywords, using which is a compilation error
+// when exceptions are disabled with -fno-exceptions.
+#if RXCPP_USE_EXCEPTIONS
+#define RXCPP_TRY try
+#define RXCPP_CATCH(...) catch(__VA_ARGS__)
+// See also rxu::throw_exception for 'throw' keyword replacement.
+#else
+#define RXCPP_TRY if ((true))
+#define RXCPP_CATCH(...) if ((false))
+// See also rxu::throw_exception, which will std::terminate without exceptions.
+#endif
+
namespace rxcpp {
namespace util {
@@ -521,11 +533,16 @@ auto print_followed_by(OStream& os, DelimitValue dv)
}
inline std::string what(std::exception_ptr ep) {
+#if RXCPP_USE_EXCEPTIONS
try {std::rethrow_exception(ep);}
catch (const std::exception& ex) {
return ex.what();
+ } catch (...) {
+ return std::string("<not derived from std::exception>");
}
- return std::string();
+#endif
+ (void)ep;
+ return std::string("<exceptions are disabled>");
}
namespace detail {
@@ -690,9 +707,9 @@ public:
{
if (!!function)
{
- try {
+ RXCPP_TRY {
(*function)();
- } catch (...) {
+ } RXCPP_CATCH(...) {
std::terminate();
}
}
@@ -802,6 +819,129 @@ template <class T>
struct negation : detail::not_value<T> {};
}
+
+#if !RXCPP_USE_EXCEPTIONS
+namespace util {
+
+namespace detail {
+
+struct error_base {
+ virtual const char* what() = 0;
+ virtual ~error_base() {}
+};
+
+// Use the "Type Erasure" idiom to wrap an std::exception-like
+// value into an error pointer.
+//
+// Supported types:
+// exception, bad_exception, bad_alloc.
+template <class E>
+struct error_specific : public error_base {
+ error_specific(const E& e) : data(e) {}
+ error_specific(E&& e) : data(std::move(e)) {}
+
+ virtual ~error_specific() {}
+
+ virtual const char* what() {
+ return data.what();
+ }
+
+ E data;
+};
+
+}
+
+}
+#endif
+
+namespace util {
+
+#if RXCPP_USE_EXCEPTIONS
+using error_ptr = std::exception_ptr;
+#else
+// Note: std::exception_ptr cannot be used directly when exceptions are disabled.
+// Any attempt to 'throw' or to call into any of the std functions accepting
+// an std::exception_ptr will either fail to compile or result in an abort at runtime.
+using error_ptr = std::shared_ptr<util::detail::error_base>;
+
+inline std::string what(error_ptr ep) {
+ return std::string(ep->what());
+}
+#endif
+
+// TODO: Do we really need an identity make?
+// (It was causing some compilation errors deep inside templates).
+inline error_ptr make_error_ptr(error_ptr e) {
+ return e;
+}
+
+// Replace std::make_exception_ptr (which would immediately terminate
+// when exceptions are disabled).
+template <class E>
+error_ptr make_error_ptr(E&& e) {
+#if RXCPP_USE_EXCEPTIONS
+ return std::make_exception_ptr(std::forward<E>(e));
+#else
+ using e_type = rxcpp::util::decay_t<E>;
+ using pointed_to_type = rxcpp::util::detail::error_specific<e_type>;
+ auto sp = std::make_shared<pointed_to_type>(std::forward<E>(e));
+ return std::static_pointer_cast<rxcpp::util::detail::error_base>(sp);
+#endif
+}
+
+// Replace std::rethrow_exception to be compatible with our error_ptr typedef.
+[[noreturn]] inline void rethrow_exception(error_ptr e) {
+#if RXCPP_USE_EXCEPTIONS
+ std::rethrow_exception(e);
+#else
+ // error_ptr != std::exception_ptr so we can't use std::rethrow_exception
+ //
+ // However even if we could, calling std::rethrow_exception just terminates if exceptions are disabled.
+ //
+ // Therefore this function should only be called when we are completely giving up and have no idea
+ // how to handle the error.
+ (void)e;
+ std::terminate();
+#endif
+}
+
+// A replacement for the "throw" keyword which is illegal when
+// exceptions are disabled with -fno-exceptions.
+template <typename E>
+[[noreturn]] inline void throw_exception(E&& e) {
+#if RXCPP_USE_EXCEPTIONS
+ throw std::forward<E>(e);
+#else
+ // "throw" keyword is unsupported when exceptions are disabled.
+ // Immediately terminate instead.
+ (void)e;
+ std::terminate();
+#endif
+}
+
+// TODO: Do we really need this? rxu::rethrow_exception(rxu::current_exception())
+// would have the same semantics in either case.
+[[noreturn]] inline void rethrow_current_exception() {
+#if RXCPP_USE_EXCEPTIONS
+ std::rethrow_exception(std::current_exception());
+#else
+ std::terminate();
+#endif
+}
+
+// If called during exception handling, return the currently caught exception.
+// Otherwise return null.
+inline error_ptr current_exception() {
+#if RXCPP_USE_EXCEPTIONS
+ return std::current_exception();
+#else
+ // When exceptions are disabled, we can never be inside of a catch block.
+ // Return null similar to std::current_exception returning null outside of catch.
+ return nullptr;
+#endif
+}
+
+}
namespace rxu=util;
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
index 3f73e52..aaa073d 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-test.hpp
@@ -200,7 +200,7 @@ subscriber<T, rxt::testable_observer<T>> test_type::test_type_worker::make_subsc
recorded_type(ts->sc->clock(), notification_type::on_next(value)));
},
// on_error
- [ts](std::exception_ptr e)
+ [ts](rxu::error_ptr e)
{
ts->m.push_back(
recorded_type(ts->sc->clock(), notification_type::on_error(e)));
diff --git a/Rx/v2/src/rxcpp/sources/rx-error.hpp b/Rx/v2/src/rxcpp/sources/rx-error.hpp
index 461c081..f245d02 100644
--- a/Rx/v2/src/rxcpp/sources/rx-error.hpp
+++ b/Rx/v2/src/rxcpp/sources/rx-error.hpp
@@ -46,17 +46,17 @@ struct error : public source_base<T>
struct error_initial_type
{
- error_initial_type(std::exception_ptr e, coordination_type cn)
+ error_initial_type(rxu::error_ptr e, coordination_type cn)
: exception(e)
, coordination(std::move(cn))
{
}
- std::exception_ptr exception;
+ rxu::error_ptr exception;
coordination_type coordination;
};
error_initial_type initial;
- error(std::exception_ptr e, coordination_type cn)
+ error(rxu::error_ptr e, coordination_type cn)
: initial(e, std::move(cn))
{
}
@@ -93,7 +93,7 @@ struct throw_ptr_tag{};
struct throw_instance_tag{};
template <class T, class Coordination>
-auto make_error(throw_ptr_tag&&, std::exception_ptr exception, Coordination cn)
+auto make_error(throw_ptr_tag&&, rxu::error_ptr exception, Coordination cn)
-> observable<T, error<T, Coordination>> {
return observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(exception), std::move(cn)));
}
@@ -101,26 +101,29 @@ auto make_error(throw_ptr_tag&&, std::exception_ptr exception, Coordination cn)
template <class T, class E, class Coordination>
auto make_error(throw_instance_tag&&, E e, Coordination cn)
-> observable<T, error<T, Coordination>> {
- std::exception_ptr exception;
- try {throw e;} catch(...) {exception = std::current_exception();}
- return observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(exception), std::move(cn)));
+ rxu::error_ptr ep = rxu::make_error_ptr(e);
+ return observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(ep), std::move(cn)));
+}
+
}
}
+namespace sources {
+
/*! @copydoc rx-error.hpp
*/
template<class T, class E>
auto error(E e)
- -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate())) {
- return detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate());
+ -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate())) {
+ return detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate());
}
/*! @copydoc rx-error.hpp
*/
template<class T, class E, class Coordination>
auto error(E e, Coordination cn)
- -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn))) {
- return detail::make_error<T>(typename std::conditional<std::is_same<std::exception_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn));
+ -> decltype(detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn))) {
+ return detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn));
}
}
diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
index a77c0c4..4ce96b4 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
@@ -40,7 +40,7 @@ class multicast_observer
}
std::mutex lock;
typename mode::type current;
- std::exception_ptr error;
+ rxu::error_ptr error;
composite_subscription lifetime;
};
@@ -191,7 +191,7 @@ public:
}
}
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
std::unique_lock<std::mutex> guard(b->state->lock);
if (b->state->current == mode::Casting) {
b->state->error = e;
diff --git a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
index 161b422..858eef3 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
@@ -56,7 +56,7 @@ class synchronize_observer : public detail::multicast_observer<T>
auto keepAlive = this->shared_from_this();
auto drain_queue = [keepAlive, this](const rxsc::schedulable& self){
- try {
+ RXCPP_TRY {
std::unique_lock<std::mutex> guard(lock);
if (!destination.is_subscribed()) {
current = mode::Disposed;
@@ -74,8 +74,8 @@ class synchronize_observer : public detail::multicast_observer<T>
guard.unlock();
notification->accept(destination);
self();
- } catch(...) {
- destination.on_error(std::current_exception());
+ } RXCPP_CATCH(...) {
+ destination.on_error(rxu::current_exception());
std::unique_lock<std::mutex> guard(lock);
current = mode::Empty;
}
@@ -110,7 +110,7 @@ class synchronize_observer : public detail::multicast_observer<T>
}
wake.notify_one();
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
if (lifetime.is_subscribed()) {
std::unique_lock<std::mutex> guard(lock);
fill_queue.push_back(notification_type::on_error(e));
@@ -150,7 +150,7 @@ public:
void on_next(V v) const {
state->on_next(std::move(v));
}
- void on_error(std::exception_ptr e) const {
+ void on_error(rxu::error_ptr e) const {
state->on_error(e);
}
void on_completed() const {