diff options
author | Kirk Shoop <kirk.shoop@microsoft.com> | 2017-09-01 17:15:10 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-09-01 17:15:10 -0700 |
commit | 7d9859639c3abcf80d6a424dd3cffea8f11c7738 (patch) | |
tree | 7b3b467de0ff0ea36503e425c52b11eb16a61eba /Rx | |
parent | 5b7b2a5519f712a7831053865a4ae6082dac4ed5 (diff) | |
download | RxCpp-7d9859639c3abcf80d6a424dd3cffea8f11c7738.tar.gz |
shutdown event loop threads (#394)
attempt to fix #393
use event_loop lifetime to unsubscribe from new_thread workers
tested with all the perf tests, but messing with thread lifetime can
break existing code.
Diffstat (limited to 'Rx')
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp | 16 | ||||
-rw-r--r-- | Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp | 18 |
2 files changed, 20 insertions, 14 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp b/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp index da45e8d..4f83767 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp @@ -30,15 +30,21 @@ private: composite_subscription lifetime; worker controller; + std::shared_ptr<const scheduler_interface> alive; public: virtual ~loop_worker() { } - loop_worker(composite_subscription cs, worker w) + loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive) : lifetime(cs) , controller(w) + , alive(alive) { + auto token = controller.add(cs); + cs.add([token, w](){ + w.remove(token); + }); } virtual clock_type::time_point now() const { @@ -57,6 +63,7 @@ private: mutable thread_factory factory; scheduler newthread; mutable std::atomic<std::size_t> count; + composite_subscription loops_lifetime; std::vector<worker> loops; public: @@ -69,7 +76,7 @@ public: { auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4)); while (remaining--) { - loops.push_back(newthread.create_worker()); + loops.push_back(newthread.create_worker(loops_lifetime)); } } explicit event_loop(thread_factory tf) @@ -79,11 +86,12 @@ public: { auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4)); while (remaining--) { - loops.push_back(newthread.create_worker()); + loops.push_back(newthread.create_worker(loops_lifetime)); } } virtual ~event_loop() { + loops_lifetime.unsubscribe(); } virtual clock_type::time_point now() const { @@ -91,7 +99,7 @@ public: } virtual worker create_worker(composite_subscription cs) const { - return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()])); + return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this())); } }; diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp index c683996..e31ed55 100644 --- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp +++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp @@ -37,16 +37,6 @@ private: virtual ~new_worker_state() { - std::unique_lock<std::mutex> guard(lock); - if (worker.joinable() && worker.get_id() != std::this_thread::get_id()) { - lifetime.unsubscribe(); - guard.unlock(); - worker.join(); - } - else { - lifetime.unsubscribe(); - worker.detach(); - } } explicit new_worker_state(composite_subscription cs) @@ -84,6 +74,14 @@ private: auto expired = std::move(keepAlive->q); if (!keepAlive->q.empty()) std::terminate(); keepAlive->wake.notify_one(); + + if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) { + guard.unlock(); + keepAlive->worker.join(); + } + else { + keepAlive->worker.detach(); + } }); state->worker = tf([keepAlive](){ |