summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-take.hpp7
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-take_until.hpp30
2 files changed, 26 insertions, 11 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-take.hpp b/Rx/v2/src/rxcpp/operators/rx-take.hpp
index b8ddc2f..ff903f0 100644
--- a/Rx/v2/src/rxcpp/operators/rx-take.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-take.hpp
@@ -39,7 +39,6 @@ struct take : public operator_base<T>
{
enum type {
taking,
- clear,
triggered,
errored,
stopped
@@ -75,9 +74,9 @@ struct take : public operator_base<T>
if (--state->count > 0) {
state->out.on_next(t);
} else {
+ state->mode_value = mode::triggered;
state->out.on_next(t);
- state->mode_value = mode::clear;
- state->out.unsubscribe();
+ state->out.on_completed();
}
}
},
@@ -88,7 +87,7 @@ struct take : public operator_base<T>
},
// on_completed
[state]() {
- state->mode_value = mode::triggered;
+ state->mode_value = mode::stopped;
state->out.on_completed();
}
);
diff --git a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp
index 608cde4..c93d568 100644
--- a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp
@@ -63,6 +63,7 @@ struct take_until : public operator_base<T>
}
mutable std::atomic<typename mode::type> mode_value;
mutable std::atomic<int> busy;
+ mutable std::mutex error_lock;
mutable std::exception_ptr exception;
output_type out;
};
@@ -81,6 +82,7 @@ struct take_until : public operator_base<T>
if (fin == mode::triggered) {
st->out.on_completed();
} else if (fin == mode::errored) {
+ std::unique_lock<std::mutex> guard(st->error_lock);
st->out.on_error(st->exception);
}
}
@@ -100,18 +102,26 @@ struct take_until : public operator_base<T>
// on_next
[state](const typename trigger_source_type::value_type&) {
activity finisher(state);
- state->mode_value = mode::triggered;
+ typename mode::type v = state->mode_value;
+ if (v != mode::taking) {return;}
+ state->mode_value.compare_exchange_strong(v, mode::triggered);
},
// on_error
[state](std::exception_ptr e) {
activity finisher(state);
- state->exception = e;
- state->mode_value = mode::errored;
+ std::unique_lock<std::mutex> guard(state->error_lock);
+ typename mode::type v = state->mode_value;
+ if (v != mode::taking) {return;}
+ if (state->mode_value.compare_exchange_strong(v, mode::errored)) {
+ state->exception = e;
+ }
},
// on_completed
[state]() {
activity finisher(state);
- state->mode_value = mode::clear;
+ typename mode::type v = state->mode_value;
+ if (v != mode::taking) {return;}
+ state->mode_value.compare_exchange_strong(v, mode::clear);
}
);
@@ -128,13 +138,19 @@ struct take_until : public operator_base<T>
// on_error
[state](std::exception_ptr e) {
activity finisher(state);
- state->exception = e;
- state->mode_value = mode::errored;
+ std::unique_lock<std::mutex> guard(state->error_lock);
+ typename mode::type v = state->mode_value;
+ if (v < mode::triggered) {return;}
+ if (state->mode_value.compare_exchange_strong(v, mode::errored)) {
+ state->exception = e;
+ }
},
// on_completed
[state]() {
activity finisher(state);
- state->mode_value = mode::triggered;
+ typename mode::type v = state->mode_value;
+ if (v < mode::triggered) {return;}
+ state->mode_value.compare_exchange_strong(v, mode::triggered);
}
);
}