summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp')
-rw-r--r--Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp18
1 files changed, 8 insertions, 10 deletions
diff --git a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
index c683996..e31ed55 100644
--- a/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
+++ b/Rx/v2/src/rxcpp/schedulers/rx-newthread.hpp
@@ -37,16 +37,6 @@ private:
virtual ~new_worker_state()
{
- std::unique_lock<std::mutex> guard(lock);
- if (worker.joinable() && worker.get_id() != std::this_thread::get_id()) {
- lifetime.unsubscribe();
- guard.unlock();
- worker.join();
- }
- else {
- lifetime.unsubscribe();
- worker.detach();
- }
}
explicit new_worker_state(composite_subscription cs)
@@ -84,6 +74,14 @@ private:
auto expired = std::move(keepAlive->q);
if (!keepAlive->q.empty()) std::terminate();
keepAlive->wake.notify_one();
+
+ if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
+ guard.unlock();
+ keepAlive->worker.join();
+ }
+ else {
+ keepAlive->worker.detach();
+ }
});
state->worker = tf([keepAlive](){