From 46017d9e4d61e89dd1f3fb44cb662b8d65697058 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Mon, 28 Apr 2014 16:16:32 -0700 Subject: fixes and test for take --- Rx/v2/src/rxcpp/operators/rx-take_until.hpp | 30 ++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) (limited to 'Rx/v2/src/rxcpp/operators/rx-take_until.hpp') diff --git a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp index 608cde4..c93d568 100644 --- a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp +++ b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp @@ -63,6 +63,7 @@ struct take_until : public operator_base } mutable std::atomic mode_value; mutable std::atomic busy; + mutable std::mutex error_lock; mutable std::exception_ptr exception; output_type out; }; @@ -81,6 +82,7 @@ struct take_until : public operator_base if (fin == mode::triggered) { st->out.on_completed(); } else if (fin == mode::errored) { + std::unique_lock guard(st->error_lock); st->out.on_error(st->exception); } } @@ -100,18 +102,26 @@ struct take_until : public operator_base // on_next [state](const typename trigger_source_type::value_type&) { activity finisher(state); - state->mode_value = mode::triggered; + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + state->mode_value.compare_exchange_strong(v, mode::triggered); }, // on_error [state](std::exception_ptr e) { activity finisher(state); - state->exception = e; - state->mode_value = mode::errored; + std::unique_lock guard(state->error_lock); + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + if (state->mode_value.compare_exchange_strong(v, mode::errored)) { + state->exception = e; + } }, // on_completed [state]() { activity finisher(state); - state->mode_value = mode::clear; + typename mode::type v = state->mode_value; + if (v != mode::taking) {return;} + state->mode_value.compare_exchange_strong(v, mode::clear); } ); @@ -128,13 +138,19 @@ struct take_until : public operator_base // on_error [state](std::exception_ptr e) { activity finisher(state); - state->exception = e; - state->mode_value = mode::errored; + std::unique_lock guard(state->error_lock); + typename mode::type v = state->mode_value; + if (v < mode::triggered) {return;} + if (state->mode_value.compare_exchange_strong(v, mode::errored)) { + state->exception = e; + } }, // on_completed [state]() { activity finisher(state); - state->mode_value = mode::triggered; + typename mode::type v = state->mode_value; + if (v < mode::triggered) {return;} + state->mode_value.compare_exchange_strong(v, mode::triggered); } ); } -- cgit v1.2.3