diff options
author | 吴天同 <wutiantong@icloud.com> | 2017-01-12 14:17:49 +0800 |
---|---|---|
committer | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-01-12 12:11:11 -0800 |
commit | e1360ef0de064ef55c119791fe55fa6674bada9e (patch) | |
tree | fb8a6646b464b05b39d2236a2c952f438168e8bb /Rx/v2/src | |
parent | 5af562282f25a500d17da9c283541d7ab33d1785 (diff) | |
download | RxCpp-e1360ef0de064ef55c119791fe55fa6674bada9e.tar.gz |
Patch for issue with lifetime of subject's observers #324
Diffstat (limited to 'Rx/v2/src')
-rw-r--r-- | Rx/v2/src/rxcpp/subjects/rx-subject.hpp | 22 |
1 files changed, 6 insertions, 16 deletions
diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp index 142e613..066f152 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp @@ -34,12 +34,10 @@ class multicast_observer : public std::enable_shared_from_this<state_type> { explicit state_type(composite_subscription cs) - : generation(0) - , current(mode::Casting) + : current(mode::Casting) , lifetime(cs) { } - std::atomic<int> generation; std::mutex lock; typename mode::type current; std::exception_ptr error; @@ -85,7 +83,6 @@ class multicast_observer explicit binder_type(composite_subscription cs) : state(std::make_shared<state_type>(cs)) , id(trace_id::make_next_id_subscriber()) - , current_generation(0) { } @@ -94,8 +91,7 @@ class multicast_observer trace_id id; // used to avoid taking lock in on_next - mutable int current_generation; - mutable std::shared_ptr<completer_type> current_completer; + mutable std::weak_ptr<completer_type> current_completer; // must only be accessed under state->lock mutable std::shared_ptr<completer_type> completer; @@ -116,7 +112,6 @@ public: b->state->current = mode::Disposed; b->current_completer.reset(); b->completer.reset(); - ++b->state->generation; } }); } @@ -131,7 +126,7 @@ public: } bool has_observers() const { std::unique_lock<std::mutex> guard(b->state->lock); - return b->current_completer && !b->current_completer->observers.empty(); + return b->completer && !b->completer->observers.empty(); } template<class SubscriberFrom> void add(const SubscriberFrom& sf, observer_type o) const { @@ -147,11 +142,9 @@ public: if (b) { std::unique_lock<std::mutex> guard(b->state->lock); b->completer = std::make_shared<completer_type>(b->state, b->completer); - ++b->state->generation; } }); b->completer = std::make_shared<completer_type>(b->state, b->completer, o); - ++b->state->generation; } } break; @@ -183,13 +176,12 @@ public: } template<class V> void on_next(V v) const { - if (b->current_generation != b->state->generation) { + auto current_completer = b->current_completer.lock(); + if (!current_completer) { std::unique_lock<std::mutex> guard(b->state->lock); - b->current_generation = b->state->generation; b->current_completer = b->completer; + current_completer = b->current_completer.lock(); } - - auto current_completer = b->current_completer; if (!current_completer || current_completer->observers.empty()) { return; } @@ -207,7 +199,6 @@ public: auto s = b->state->lifetime; auto c = std::move(b->completer); b->current_completer.reset(); - ++b->state->generation; guard.unlock(); if (c) { for (auto& o : c->observers) { @@ -226,7 +217,6 @@ public: auto s = b->state->lifetime; auto c = std::move(b->completer); b->current_completer.reset(); - ++b->state->generation; guard.unlock(); if (c) { for (auto& o : c->observers) { |