diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-04-28 16:16:32 -0700 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2014-04-28 16:16:32 -0700 |
commit | 46017d9e4d61e89dd1f3fb44cb662b8d65697058 (patch) | |
tree | c60d0e8bbb40367ced0327505b97e16b318d6caf /Rx/v2/src/rxcpp/operators/rx-take_until.hpp | |
parent | 9d8db1823e90aaa62bd33b2ff93d3b5c5e0c2891 (diff) | |
download | RxCpp-46017d9e4d61e89dd1f3fb44cb662b8d65697058.tar.gz |
fixes and test for take
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-take_until.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-take_until.hpp | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp index 608cde4..c93d568 100644 --- a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp @@ -63,6 +63,7 @@ struct take_until : public operator_base<T> } mutable std::atomic<typename mode::type> mode_value; mutable std::atomic<int> busy; + mutable std::mutex error_lock; mutable std::exception_ptr exception; output_type out; }; @@ -81,6 +82,7 @@ struct take_until : public operator_base<T> if (fin == mode::triggered) { st->out.on_completed(); } else if (fin == mode::errored) { + std::unique_lock<std::mutex> guard(st->error_lock); st->out.on_error(st->exception); } } @@ -100,18 +102,26 @@ struct take_until : public operator_base<T> // on_next [state](const typename trigger_source_type::value_type&) { activity finisher(state); - state->mode_value = mode::triggered; + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + state->mode_value.compare_exchange_strong(v, mode::triggered); }, // on_error [state](std::exception_ptr e) { activity finisher(state); - state->exception = e; - state->mode_value = mode::errored; + std::unique_lock<std::mutex> guard(state->error_lock); + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + if (state->mode_value.compare_exchange_strong(v, mode::errored)) { + state->exception = e; + } }, // on_completed [state]() { activity finisher(state); - state->mode_value = mode::clear; + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + state->mode_value.compare_exchange_strong(v, mode::clear); } ); @@ -128,13 +138,19 @@ struct take_until : public operator_base<T> // on_error [state](std::exception_ptr e) { activity finisher(state); - state->exception = e; - state->mode_value = mode::errored; + std::unique_lock<std::mutex> guard(state->error_lock); + typename mode::type v = state->mode_value; + if (v < mode::triggered) {return;} + if (state->mode_value.compare_exchange_strong(v, mode::errored)) { + state->exception = e; + } }, // on_completed [state]() { activity finisher(state); - state->mode_value = mode::triggered; + typename mode::type v = state->mode_value; + if (v < mode::triggered) {return;} + state->mode_value.compare_exchange_strong(v, mode::triggered); } ); } |