summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-take_until.hpp
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2014-04-28 16:16:32 -0700
committerKirk Shoop <kirk.shoop@microsoft.com>2014-04-28 16:16:32 -0700
commit46017d9e4d61e89dd1f3fb44cb662b8d65697058 (patch)
treec60d0e8bbb40367ced0327505b97e16b318d6caf /Rx/v2/src/rxcpp/operators/rx-take_until.hpp
parent9d8db1823e90aaa62bd33b2ff93d3b5c5e0c2891 (diff)
downloadRxCpp-46017d9e4d61e89dd1f3fb44cb662b8d65697058.tar.gz
fixes and test for take
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-take_until.hpp')
-rw-r--r--Rx/v2/src/rxcpp/operators/rx-take_until.hpp30
1 files changed, 23 insertions, 7 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp
index 608cde4..c93d568 100644
--- a/Rx/v2/src/rxcpp/operators/rx-take_until.hpp
+++ b/Rx/v2/src/rxcpp/operators/rx-take_until.hpp
@@ -63,6 +63,7 @@ struct take_until : public operator_base<T>
}
mutable std::atomic<typename mode::type> mode_value;
mutable std::atomic<int> busy;
+ mutable std::mutex error_lock;
mutable std::exception_ptr exception;
output_type out;
};
@@ -81,6 +82,7 @@ struct take_until : public operator_base<T>
if (fin == mode::triggered) {
st->out.on_completed();
} else if (fin == mode::errored) {
+ std::unique_lock<std::mutex> guard(st->error_lock);
st->out.on_error(st->exception);
}
}
@@ -100,18 +102,26 @@ struct take_until : public operator_base<T>
// on_next
[state](const typename trigger_source_type::value_type&) {
activity finisher(state);
- state->mode_value = mode::triggered;
+ typename mode::type v = state->mode_value;
+ if (v != mode::taking) {return;}
+ state->mode_value.compare_exchange_strong(v, mode::triggered);
},
// on_error
[state](std::exception_ptr e) {
activity finisher(state);
- state->exception = e;
- state->mode_value = mode::errored;
+ std::unique_lock<std::mutex> guard(state->error_lock);
+ typename mode::type v = state->mode_value;
+ if (v != mode::taking) {return;}
+ if (state->mode_value.compare_exchange_strong(v, mode::errored)) {
+ state->exception = e;
+ }
},
// on_completed
[state]() {
activity finisher(state);
- state->mode_value = mode::clear;
+ typename mode::type v = state->mode_value;
+ if (v != mode::taking) {return;}
+ state->mode_value.compare_exchange_strong(v, mode::clear);
}
);
@@ -128,13 +138,19 @@ struct take_until : public operator_base<T>
// on_error
[state](std::exception_ptr e) {
activity finisher(state);
- state->exception = e;
- state->mode_value = mode::errored;
+ std::unique_lock<std::mutex> guard(state->error_lock);
+ typename mode::type v = state->mode_value;
+ if (v < mode::triggered) {return;}
+ if (state->mode_value.compare_exchange_strong(v, mode::errored)) {
+ state->exception = e;
+ }
},
// on_completed
[state]() {
activity finisher(state);
- state->mode_value = mode::triggered;
+ typename mode::type v = state->mode_value;
+ if (v < mode::triggered) {return;}
+ state->mode_value.compare_exchange_strong(v, mode::triggered);
}
);
}