summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp')
-rw-r--r--Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp30
1 files changed, 18 insertions, 12 deletions
diff --git a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp
index b7f7d68..020b0f8 100644
--- a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp
+++ b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp
@@ -43,6 +43,7 @@ class replay_observer : public detail::multicast_observer<T>
mutable std::list<time_point_type> time_points;
mutable count_type count;
mutable period_type period;
+ mutable composite_subscription replayLifetime;
public:
mutable coordination_type coordination;
mutable coordinator_type coordinator;
@@ -56,9 +57,13 @@ class replay_observer : public detail::multicast_observer<T>
}
public:
- explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator)
+ ~replay_observer_state(){
+ replayLifetime.unsubscribe();
+ }
+ explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime)
: count(_count)
, period(_period)
+ , replayLifetime(_replayLifetime)
, coordination(std::move(_coordination))
, coordinator(std::move(_coordinator))
{
@@ -66,6 +71,7 @@ class replay_observer : public detail::multicast_observer<T>
void add(T v) const {
std::unique_lock<std::mutex> guard(lock);
+
if (!count.empty()) {
if (values.size() == count.get())
remove_oldest();
@@ -89,11 +95,12 @@ class replay_observer : public detail::multicast_observer<T>
std::shared_ptr<replay_observer_state> state;
public:
- replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription cs)
- : base_type(cs)
+ replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime)
+ : base_type(subscriberLifetime)
{
- auto coordinator = coordination.create_coordinator(cs);
- state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator));
+ replayLifetime.add(subscriberLifetime);
+ auto coordinator = coordination.create_coordinator(replayLifetime);
+ state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime));
}
subscriber<T> get_subscriber() const {
@@ -129,22 +136,22 @@ class replay
public:
explicit replay(Coordination cn, composite_subscription cs = composite_subscription())
- : s(count_type(), period_type(), cn, cs)
+ : s(count_type(), period_type(), cn, cs, composite_subscription{})
{
}
replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription())
- : s(count_type(std::move(count)), period_type(), cn, cs)
+ : s(count_type(std::move(count)), period_type(), cn, cs, composite_subscription{})
{
}
replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
- : s(count_type(), period_type(period), cn, cs)
+ : s(count_type(), period_type(period), cn, cs, composite_subscription{})
{
}
replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
- : s(count_type(count), period_type(period), cn, cs)
+ : s(count_type(count), period_type(period), cn, cs, composite_subscription{})
{
}
@@ -163,9 +170,8 @@ public:
observable<T> get_observable() const {
auto keepAlive = s;
auto observable = make_observable_dynamic<T>([=](subscriber<T> o){
- if (keepAlive.get_subscription().is_subscribed()) {
- for (auto&& value: get_values())
- o.on_next(value);
+ for (auto&& value: get_values()) {
+ o.on_next(value);
}
keepAlive.add(keepAlive.get_subscriber(), std::move(o));
});