diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp index b7f7d68..020b0f8 100644 --- a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp +++ b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp @@ -43,6 +43,7 @@ class replay_observer : public detail::multicast_observer<T> mutable std::list<time_point_type> time_points; mutable count_type count; mutable period_type period; + mutable composite_subscription replayLifetime; public: mutable coordination_type coordination; mutable coordinator_type coordinator; @@ -56,9 +57,13 @@ class replay_observer : public detail::multicast_observer<T> } public: - explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator) + ~replay_observer_state(){ + replayLifetime.unsubscribe(); + } + explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime) : count(_count) , period(_period) + , replayLifetime(_replayLifetime) , coordination(std::move(_coordination)) , coordinator(std::move(_coordinator)) { @@ -66,6 +71,7 @@ class replay_observer : public detail::multicast_observer<T> void add(T v) const { std::unique_lock<std::mutex> guard(lock); + if (!count.empty()) { if (values.size() == count.get()) remove_oldest(); @@ -89,11 +95,12 @@ class replay_observer : public detail::multicast_observer<T> std::shared_ptr<replay_observer_state> state; public: - replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription cs) - : base_type(cs) + replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime) + : base_type(subscriberLifetime) { - auto coordinator = coordination.create_coordinator(cs); - state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator)); + replayLifetime.add(subscriberLifetime); + auto coordinator = coordination.create_coordinator(replayLifetime); + state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime)); } subscriber<T> get_subscriber() const { @@ -129,22 +136,22 @@ class replay public: explicit replay(Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(), period_type(), cn, cs) + : s(count_type(), period_type(), cn, cs, composite_subscription{}) { } replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(std::move(count)), period_type(), cn, cs) + : s(count_type(std::move(count)), period_type(), cn, cs, composite_subscription{}) { } replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(), period_type(period), cn, cs) + : s(count_type(), period_type(period), cn, cs, composite_subscription{}) { } replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(count), period_type(period), cn, cs) + : s(count_type(count), period_type(period), cn, cs, composite_subscription{}) { } @@ -163,9 +170,8 @@ public: observable<T> get_observable() const { auto keepAlive = s; auto observable = make_observable_dynamic<T>([=](subscriber<T> o){ - if (keepAlive.get_subscription().is_subscribed()) { - for (auto&& value: get_values()) - o.on_next(value); + for (auto&& value: get_values()) { + o.on_next(value); } keepAlive.add(keepAlive.get_subscriber(), std::move(o)); }); |