summaryrefslogtreecommitdiff
path: root/Rx
diff options
context:
space:
mode:
authorKirk Shoop <kirk.shoop@microsoft.com>2017-09-01 17:15:10 -0700
committerGitHub <noreply@github.com>2017-09-01 17:15:10 -0700
commit7d9859639c3abcf80d6a424dd3cffea8f11c7738 (patch)
tree7b3b467de0ff0ea36503e425c52b11eb16a61eba /Rx
parent5b7b2a5519f712a7831053865a4ae6082dac4ed5 (diff)
downloadRxCpp-7d9859639c3abcf80d6a424dd3cffea8f11c7738.tar.gz
shutdown event loop threads (#394)
attempt to fix #393 use event_loop lifetime to unsubscribe from new_thread workers tested with all the perf tests, but messing with thread lifetime can break existing code.
Diffstat (limited to 'Rx')
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp16
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp18
2 files changed, 20 insertions, 14 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp b/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp
index da45e8d..4f83767 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-eventloop.hpp
@@ -30,15 +30,21 @@ private:
composite_subscription lifetime;
worker controller;
+ std::shared_ptr<const scheduler_interface> alive;
public:
virtual ~loop_worker()
{
}
- loop_worker(composite_subscription cs, worker w)
+ loop_worker(composite_subscription cs, worker w, std::shared_ptr<const scheduler_interface> alive)
: lifetime(cs)
, controller(w)
+ , alive(alive)
{
+ auto token = controller.add(cs);
+ cs.add([token, w](){
+ w.remove(token);
+ });
}
virtual clock_type::time_point now() const {
@@ -57,6 +63,7 @@ private:
mutable thread_factory factory;
scheduler newthread;
mutable std::atomic<std::size_t> count;
+ composite_subscription loops_lifetime;
std::vector<worker> loops;
public:
@@ -69,7 +76,7 @@ public:
{
auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
while (remaining--) {
- loops.push_back(newthread.create_worker());
+ loops.push_back(newthread.create_worker(loops_lifetime));
}
}
explicit event_loop(thread_factory tf)
@@ -79,11 +86,12 @@ public:
{
auto remaining = std::max(std::thread::hardware_concurrency(), unsigned(4));
while (remaining--) {
- loops.push_back(newthread.create_worker());
+ loops.push_back(newthread.create_worker(loops_lifetime));
}
}
virtual ~event_loop()
{
+ loops_lifetime.unsubscribe();
}
virtual clock_type::time_point now() const {
@@ -91,7 +99,7 @@ public:
}
virtual worker create_worker(composite_subscription cs) const {
- return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()]));
+ return worker(cs, std::make_shared<loop_worker>(cs, loops[++count % loops.size()], this->shared_from_this()));
}
};
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
index c683996..e31ed55 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
@@ -37,16 +37,6 @@ private:
virtual ~new_worker_state()
{
- std::unique_lock<std::mutex> guard(lock);
- if (worker.joinable() && worker.get_id() != std::this_thread::get_id()) {
- lifetime.unsubscribe();
- guard.unlock();
- worker.join();
- }
- else {
- lifetime.unsubscribe();
- worker.detach();
- }
}
explicit new_worker_state(composite_subscription cs)
@@ -84,6 +74,14 @@ private:
auto expired = std::move(keepAlive->q);
if (!keepAlive->q.empty()) std::terminate();
keepAlive->wake.notify_one();
+
+ if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
+ guard.unlock();
+ keepAlive->worker.join();
+ }
+ else {
+ keepAlive->worker.detach();
+ }
});
state->worker = tf([keepAlive](){