diff options
Diffstat (limited to 'Rx/v2/src')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-take.hpp | 7 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-take_until.hpp | 30 |
2 files changed, 26 insertions, 11 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-take.hpp b/Rx/v2/src/rxcpp/operators/rx-take.hpp index b8ddc2f..ff903f0 100644 --- a/Rx/v2/src/rxcpp/operators/rx-take.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-take.hpp @@ -39,7 +39,6 @@ struct take : public operator_base<T> { enum type { taking, - clear, triggered, errored, stopped @@ -75,9 +74,9 @@ struct take : public operator_base<T> if (--state->count > 0) { state->out.on_next(t); } else { + state->mode_value = mode::triggered; state->out.on_next(t); - state->mode_value = mode::clear; - state->out.unsubscribe(); + state->out.on_completed(); } } }, @@ -88,7 +87,7 @@ struct take : public operator_base<T> }, // on_completed [state]() { - state->mode_value = mode::triggered; + state->mode_value = mode::stopped; state->out.on_completed(); } ); diff --git a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp index 608cde4..c93d568 100644 --- a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp @@ -63,6 +63,7 @@ struct take_until : public operator_base<T> } mutable std::atomic<typename mode::type> mode_value; mutable std::atomic<int> busy; + mutable std::mutex error_lock; mutable std::exception_ptr exception; output_type out; }; @@ -81,6 +82,7 @@ struct take_until : public operator_base<T> if (fin == mode::triggered) { st->out.on_completed(); } else if (fin == mode::errored) { + std::unique_lock<std::mutex> guard(st->error_lock); st->out.on_error(st->exception); } } @@ -100,18 +102,26 @@ struct take_until : public operator_base<T> // on_next [state](const typename trigger_source_type::value_type&) { activity finisher(state); - state->mode_value = mode::triggered; + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + state->mode_value.compare_exchange_strong(v, mode::triggered); }, // on_error [state](std::exception_ptr e) { activity finisher(state); - state->exception = e; - state->mode_value = mode::errored; + std::unique_lock<std::mutex> guard(state->error_lock); + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + if (state->mode_value.compare_exchange_strong(v, mode::errored)) { + state->exception = e; + } }, // on_completed [state]() { activity finisher(state); - state->mode_value = mode::clear; + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + state->mode_value.compare_exchange_strong(v, mode::clear); } ); @@ -128,13 +138,19 @@ struct take_until : public operator_base<T> // on_error [state](std::exception_ptr e) { activity finisher(state); - state->exception = e; - state->mode_value = mode::errored; + std::unique_lock<std::mutex> guard(state->error_lock); + typename mode::type v = state->mode_value; + if (v < mode::triggered) {return;} + if (state->mode_value.compare_exchange_strong(v, mode::errored)) { + state->exception = e; + } }, // on_completed [state]() { activity finisher(state); - state->mode_value = mode::triggered; + typename mode::type v = state->mode_value; + if (v < mode::triggered) {return;} + state->mode_value.compare_exchange_strong(v, mode::triggered); } ); } |