summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp')
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp15
1 files changed, 9 insertions, 6 deletions
diff --git a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
index c28fa91..564f564 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-synchronize.hpp
@@ -143,6 +143,10 @@ public:
state = std::make_shared<synchronize_observer_state>(std::move(coordinator), std::move(il), std::move(o));
}
+ subscriber<T> get_subscriber() const {
+ return make_subscriber<T>(this->get_id(), state->lifetime, observer<T, detail::synchronize_observer<T, Coordination>>(*this)).as_dynamic();
+ }
+
template<class V>
void on_next(V v) const {
state->on_next(std::move(v));
@@ -160,13 +164,11 @@ public:
template<class T, class Coordination>
class synchronize
{
- composite_subscription lifetime;
detail::synchronize_observer<T, Coordination> s;
public:
explicit synchronize(Coordination cn, composite_subscription cs = composite_subscription())
- : lifetime(composite_subscription())
- , s(std::move(cn), std::move(cs), lifetime)
+ : s(std::move(cn), std::move(cs), composite_subscription())
{
}
@@ -175,12 +177,13 @@ public:
}
subscriber<T> get_subscriber() const {
- return make_subscriber<T>(lifetime, make_observer_dynamic<T>(observer<T, detail::synchronize_observer<T, Coordination>>(s)));
+ return s.get_subscriber();
}
observable<T> get_observable() const {
- return make_observable_dynamic<T>([this](subscriber<T> o){
- this->s.add(std::move(o));
+ auto keepAlive = s;
+ return make_observable_dynamic<T>([=](subscriber<T> o){
+ keepAlive.add(s.get_subscriber(), std::move(o));
});
}
};