diff options
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp | 16 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp | 18 |
2 files changed, 20 insertions, 14 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp b/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp index da45e8d..4f83767 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp @@ -30,15 +30,21 @@ private: composite_subscription lifetime; worker controller; + std::shared_ptr<const scheduler_interface> alive; public: virtual ~loop_worker() { } - loop_worker(composite_subscription cs, worker w) + loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive) : lifetime(cs) , controller(w) + , alive(alive) { + auto token = controller.add(cs); + cs.add([token, w](){ + w.remove(token); + }); } virtual clock_type::time_point now() const { @@ -57,6 +63,7 @@ private: mutable thread_factory factory; scheduler newthread; mutable std::atomic<std::size_t> count; + composite_subscription loops_lifetime; std::vector<worker> loops; public: @@ -69,7 +76,7 @@ public: { auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4)); while (remaining--) { - loops.push_back(newthread.create_worker()); + loops.push_back(newthread.create_worker(loops_lifetime)); } } explicit event_loop(thread_factory tf) @@ -79,11 +86,12 @@ public: { auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4)); while (remaining--) { - loops.push_back(newthread.create_worker()); + loops.push_back(newthread.create_worker(loops_lifetime)); } } virtual ~event_loop() { + loops_lifetime.unsubscribe(); } virtual clock_type::time_point now() const { @@ -91,7 +99,7 @@ public: } virtual worker create_worker(composite_subscription cs) const { - return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()])); + return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this())); } }; diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp index c683996..e31ed55 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp @@ -37,16 +37,6 @@ private: virtual ~new_worker_state() { - std::unique_lock<std::mutex> guard(lock); - if (worker.joinable() && worker.get_id() != std::this_thread::get_id()) { - lifetime.unsubscribe(); - guard.unlock(); - worker.join(); - } - else { - lifetime.unsubscribe(); - worker.detach(); - } } explicit new_worker_state(composite_subscription cs) @@ -84,6 +74,14 @@ private: auto expired = std::move(keepAlive->q); if (!keepAlive->q.empty()) std::terminate(); keepAlive->wake.notify_one(); + + if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) { + guard.unlock(); + keepAlive->worker.join(); + } + else { + keepAlive->worker.detach(); + } }); state->worker = tf([keepAlive](){ |