diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-take_until.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-take_until.hpp | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp index 608cde4..c93d568 100644 --- a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp @@ -63,6 +63,7 @@ struct take_until : public operator_base<T> } mutable std::atomic<typename mode::type> mode_value; mutable std::atomic<int> busy; + mutable std::mutex error_lock; mutable std::exception_ptr exception; output_type out; }; @@ -81,6 +82,7 @@ struct take_until : public operator_base<T> if (fin == mode::triggered) { st->out.on_completed(); } else if (fin == mode::errored) { + std::unique_lock<std::mutex> guard(st->error_lock); st->out.on_error(st->exception); } } @@ -100,18 +102,26 @@ struct take_until : public operator_base<T> // on_next [state](const typename trigger_source_type::value_type&) { activity finisher(state); - state->mode_value = mode::triggered; + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + state->mode_value.compare_exchange_strong(v, mode::triggered); }, // on_error [state](std::exception_ptr e) { activity finisher(state); - state->exception = e; - state->mode_value = mode::errored; + std::unique_lock<std::mutex> guard(state->error_lock); + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + if (state->mode_value.compare_exchange_strong(v, mode::errored)) { + state->exception = e; + } }, // on_completed [state]() { activity finisher(state); - state->mode_value = mode::clear; + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + state->mode_value.compare_exchange_strong(v, mode::clear); } ); @@ -128,13 +138,19 @@ struct take_until : public operator_base<T> // on_error [state](std::exception_ptr e) { activity finisher(state); - state->exception = e; - state->mode_value = mode::errored; + std::unique_lock<std::mutex> guard(state->error_lock); + typename mode::type v = state->mode_value; + if (v < mode::triggered) {return;} + if (state->mode_value.compare_exchange_strong(v, mode::errored)) { + state->exception = e; + } }, // on_completed [state]() { activity finisher(state); - state->mode_value = mode::triggered; + typename mode::type v = state->mode_value; + if (v < mode::triggered) {return;} + state->mode_value.compare_exchange_strong(v, mode::triggered); } ); } |