summaryrefslogtreecommitdiff
path: root/Rx/v2/src
diff options
context:
space:
mode:
author吴天同 <wutiantong@icloud.com>2017-01-12 14:17:49 +0800
committerKirk Shoop <kirk.shoop@microsoft.com>2017-01-12 12:11:11 -0800
commite1360ef0de064ef55c119791fe55fa6674bada9e (patch)
treefb8a6646b464b05b39d2236a2c952f438168e8bb /Rx/v2/src
parent5af562282f25a500d17da9c283541d7ab33d1785 (diff)
downloadRxCpp-e1360ef0de064ef55c119791fe55fa6674bada9e.tar.gz
Patch for issue with lifetime of subject's observers #324
Diffstat (limited to 'Rx/v2/src')
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-subject.hpp22
1 files changed, 6 insertions, 16 deletions
diff --git a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
index 142e613..066f152 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-subject.hpp
@@ -34,12 +34,10 @@ class multicast_observer
: public std::enable_shared_from_this<state_type>
{
explicit state_type(composite_subscription cs)
- : generation(0)
- , current(mode::Casting)
+ : current(mode::Casting)
, lifetime(cs)
{
}
- std::atomic<int> generation;
std::mutex lock;
typename mode::type current;
std::exception_ptr error;
@@ -85,7 +83,6 @@ class multicast_observer
explicit binder_type(composite_subscription cs)
: state(std::make_shared<state_type>(cs))
, id(trace_id::make_next_id_subscriber())
- , current_generation(0)
{
}
@@ -94,8 +91,7 @@ class multicast_observer
trace_id id;
// used to avoid taking lock in on_next
- mutable int current_generation;
- mutable std::shared_ptr<completer_type> current_completer;
+ mutable std::weak_ptr<completer_type> current_completer;
// must only be accessed under state->lock
mutable std::shared_ptr<completer_type> completer;
@@ -116,7 +112,6 @@ public:
b->state->current = mode::Disposed;
b->current_completer.reset();
b->completer.reset();
- ++b->state->generation;
}
});
}
@@ -131,7 +126,7 @@ public:
}
bool has_observers() const {
std::unique_lock<std::mutex> guard(b->state->lock);
- return b->current_completer && !b->current_completer->observers.empty();
+ return b->completer && !b->completer->observers.empty();
}
template<class SubscriberFrom>
void add(const SubscriberFrom& sf, observer_type o) const {
@@ -147,11 +142,9 @@ public:
if (b) {
std::unique_lock<std::mutex> guard(b->state->lock);
b->completer = std::make_shared<completer_type>(b->state, b->completer);
- ++b->state->generation;
}
});
b->completer = std::make_shared<completer_type>(b->state, b->completer, o);
- ++b->state->generation;
}
}
break;
@@ -183,13 +176,12 @@ public:
}
template<class V>
void on_next(V v) const {
- if (b->current_generation != b->state->generation) {
+ auto current_completer = b->current_completer.lock();
+ if (!current_completer) {
std::unique_lock<std::mutex> guard(b->state->lock);
- b->current_generation = b->state->generation;
b->current_completer = b->completer;
+ current_completer = b->current_completer.lock();
}
-
- auto current_completer = b->current_completer;
if (!current_completer || current_completer->observers.empty()) {
return;
}
@@ -207,7 +199,6 @@ public:
auto s = b->state->lifetime;
auto c = std::move(b->completer);
b->current_completer.reset();
- ++b->state->generation;
guard.unlock();
if (c) {
for (auto& o : c->observers) {
@@ -226,7 +217,6 @@ public:
auto s = b->state->lifetime;
auto c = std::move(b->completer);
b->current_completer.reset();
- ++b->state->generation;
guard.unlock();
if (c) {
for (auto& o : c->observers) {