diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp b/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp index da45e8d..4f83767 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp @@ -30,15 +30,21 @@ private: composite_subscription lifetime; worker controller; + std::shared_ptr<const scheduler_interface> alive; public: virtual ~loop_worker() { } - loop_worker(composite_subscription cs, worker w) + loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive) : lifetime(cs) , controller(w) + , alive(alive) { + auto token = controller.add(cs); + cs.add([token, w](){ + w.remove(token); + }); } virtual clock_type::time_point now() const { @@ -57,6 +63,7 @@ private: mutable thread_factory factory; scheduler newthread; mutable std::atomic<std::size_t> count; + composite_subscription loops_lifetime; std::vector<worker> loops; public: @@ -69,7 +76,7 @@ public: { auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4)); while (remaining--) { - loops.push_back(newthread.create_worker()); + loops.push_back(newthread.create_worker(loops_lifetime)); } } explicit event_loop(thread_factory tf) @@ -79,11 +86,12 @@ public: { auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4)); while (remaining--) { - loops.push_back(newthread.create_worker()); + loops.push_back(newthread.create_worker(loops_lifetime)); } } virtual ~event_loop() { + loops_lifetime.unsubscribe(); } virtual clock_type::time_point now() const { @@ -91,7 +99,7 @@ public: } virtual worker create_worker(composite_subscription cs) const { - return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()])); + return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this())); } }; |